From 026d2e120db88145404aac4e3ad99c914c587af2 Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Wed, 20 Dec 2023 10:12:03 +0000 Subject: [PATCH 1/7] YQL-9517: RPC Arrow reader YT column converters --- .../common/dq/yql_dq_integration_impl.cpp | 14 - .../yt/comp_nodes/dq/arrow_converter.cpp | 579 ++++++++++++++++++ .../yt/comp_nodes/dq/arrow_converter.h | 22 + .../yt/comp_nodes/dq/dq_yt_block_reader.cpp | 245 +++++--- .../yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp | 3 +- .../provider/ut/yql_yt_dq_integration_ut.cpp | 3 +- ydb/library/yql/providers/yt/provider/ya.make | 1 + .../yt/provider/yql_yt_dq_integration.cpp | 19 +- .../yt/provider/yql_yt_dq_integration.h | 3 +- .../yt/provider/yql_yt_mkql_compiler.cpp | 2 +- .../providers/yt/provider/yql_yt_provider.cpp | 2 +- 11 files changed, 771 insertions(+), 122 deletions(-) create mode 100644 ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp create mode 100644 ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 30741cde640b..de9baf9513c1 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -22,20 +22,6 @@ TMaybe TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVectorGetItems()) { - // Check type - auto type = e->GetItemType(); - while (ETypeAnnotationKind::Optional == type->GetKind()) { - type = type->Cast()->GetItemType(); - } - if (ETypeAnnotationKind::Data != type->GetKind()) { - return false; - } - } - return true; -} - TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) { return read; } diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp new file mode 100644 index 000000000000..6a6413d72616 --- /dev/null +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -0,0 +1,579 @@ +#include "arrow_converter.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace NYql::NDqs { + +template +struct TypeHelper { + using Type = T; +}; + +#define GEN_TYPE(type)\ + NumericConverterImpl + +#define GEN_TYPE_STR(type)\ + StringConverterImpl + +template +arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { + if constexpr (!IsDictionary) { + typename ::arrow::TypeTraits::ArrayType val(block); // checking for compatibility + arrow::UInt32Array w; + if (val.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (val.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + builder->Add(NUdf::TBlockItem(val.Value(i))); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + builder->Add(NUdf::TBlockItem(val.Value(i))); + } + } + return builder->Build(false); + } + arrow::DictionaryArray dict(block); + typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } + } + return builder->Build(false); +} + +template +arrow::Datum StringConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { + if constexpr (!IsDictionary) { + typename ::arrow::TypeTraits::ArrayType val(block); // checking for compatibility + if (val.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (val.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(i, &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(i, &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + return builder->Build(false); + } + arrow::DictionaryArray dict(block); + typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(data[i], &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(data[i], &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + return builder->Build(false); +} + +using namespace NKikimr::NMiniKQL; +using namespace NYson::NDetail; + +class TYsonReaderDetails { +public: + TYsonReaderDetails(const std::string_view& s) : Data_(s.data()), Available_(s.size()) {} + + constexpr char Next() { + YQL_ENSURE(Available_-- > 0); + return *(++Data_); + } + + constexpr char Current() { + return *Data_; + } + + template + constexpr T ReadVarSlow() { + T shift = 0; + T value = Current() & 0x7f; + for (;;) { + shift += 7; + value |= T(Next() & 0x7f) << shift; + if (!(Current() & 0x80)) { + break; + } + } + Next(); + return value; + } + + ui32 ReadVarUI32() { + char prev = Current(); + if (Y_LIKELY(!(prev & 0x80))) { + Next(); + return prev; + } + + return ReadVarSlow(); + } + + ui64 ReadVarUI64() { + char prev = Current(); + if (Y_LIKELY(!(prev & 0x80))) { + Next(); + return prev; + } + + return ReadVarSlow(); + } + + i32 ReadVarI32() { + return NYson::ZigZagDecode32(ReadVarUI32()); + } + + i64 ReadVarI64() { + return NYson::ZigZagDecode64(ReadVarUI64()); + } + + double NextDouble() { + double val = *reinterpret_cast(Data_); + Data_ += sizeof(double); + return val; + } + + void Skip(i32 cnt) { + Data_ += cnt; + } + + const char* Data() { + return Data_; + } +private: + const char* Data_; + size_t Available_; +}; + +class IYsonBlockReader { +public: + virtual NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) = 0; + virtual ~IYsonBlockReader() = default; +}; + +template +class IYsonBlockReaderWithNativeFlag : public IYsonBlockReader { +public: + virtual NUdf::TBlockItem GetNotNull(TYsonReaderDetails&) = 0; + NUdf::TBlockItem GetNullableItem(TYsonReaderDetails& buf) { + char prev = buf.Current(); + if constexpr (Native) { + if (prev == EntitySymbol) { + buf.Next(); + return NUdf::TBlockItem(); + } + return GetNotNull(buf).MakeOptional(); + } + buf.Next(); + if (prev == EntitySymbol) { + return NUdf::TBlockItem(); + } + YQL_ENSURE(prev == BeginListSymbol); + auto result = GetNotNull(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return result.MakeOptional(); + } +private: +}; + +template +class TYsonTupleBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + TYsonTupleBlockReader(TVector>&& children) + : Children_(std::move(children)) + , Items_(Children_.size()) + {} + + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + YQL_ENSURE(buf.Current() == BeginListSymbol); + buf.Next(); + for (ui32 i = 0; i < Children_.size(); ++i) { + Items_[i] = Children_[i]->GetItem(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return NUdf::TBlockItem(Items_.data()); + } +private: + const TVector> Children_; + TVector Items_; +}; + +template +class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + YQL_ENSURE(buf.Current() == StringMarker); + buf.Next(); + const i32 length = buf.ReadVarI32(); + auto res = NUdf::TBlockItem(NUdf::TStringRef(buf.Data(), length)); + buf.Skip(length); + return res; + } +private: + const TVector> Children_; + TVector Items_; +}; + +namespace { +struct TYtColumnConverterSettings { + TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative); + NKikimr::NMiniKQL::TType* Type; + const NUdf::IPgBuilder* PgBuilder; + arrow::MemoryPool& Pool; + bool IsNative; + bool IsTopOptional = false; + std::shared_ptr ArrowType; + std::unique_ptr Builder; +}; +} + +template +class TYsonFixedSizeBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + if constexpr (std::is_same_v) { + YQL_ENSURE(buf.Current() == FalseMarker || buf.Current() == TrueMarker); + bool res = buf.Current() == TrueMarker; + buf.Next(); + return NUdf::TBlockItem(res); + } + + if constexpr (std::is_same_v) { + if (buf.Current() == FalseMarker || buf.Current() == TrueMarker) { + bool res = buf.Current() == TrueMarker; + buf.Next(); + return NUdf::TBlockItem(T(res)); + } + } + + if constexpr (std::is_integral_v) { + if constexpr (std::is_signed_v) { + YQL_ENSURE(buf.Current() == Int64Marker); + buf.Next(); + return NUdf::TBlockItem(T(buf.ReadVarI64())); + } else { + YQL_ENSURE(buf.Current() == Uint64Marker); + buf.Next(); + return NUdf::TBlockItem(T(buf.ReadVarUI64())); + } + } + + YQL_ENSURE(buf.Current() == DoubleMarker); + buf.Next(); + return NUdf::TBlockItem(T(buf.NextDouble())); + } +private: + const TVector> Children_; + TVector Items_; +}; + +template +class TYsonExternalOptBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + TYsonExternalOptBlockReader(std::unique_ptr&& inner) + : Inner_(std::move(inner)) + {} + + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) final { + char prev = buf.Current(); + buf.Next(); + if (prev == EntitySymbol) { + return NUdf::TBlockItem(); + } + YQL_ENSURE(prev == BeginListSymbol); + if constexpr (!Native) { + if (buf.Current() == EndListSymbol) { + buf.Next(); + return NUdf::TBlockItem(); + } + } + auto result = Inner_->GetItem(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return result.MakeOptional(); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + YQL_ENSURE(false, "Can't be called"); + } +private: + std::unique_ptr Inner_; +}; + +template +struct TYsonBlockReaderTraits { + using TResult = IYsonBlockReader; + template + using TTuple = TYsonTupleBlockReader; + template + using TFixedSize = TYsonFixedSizeBlockReader; + template + using TStrings = TYsonStringBlockReader; + using TExtOptional = TYsonExternalOptBlockReader; + + static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + Y_UNUSED(pgBuilder); + if (desc.PassByValue) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } + +}; + +template +class TPrimitiveColumnConverter { +public: + TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { + switch (Settings_.ArrowType->id()) { + case arrow::Type::INT8: PrimitiveConverterImpl_ = GEN_TYPE(Int8); break; + case arrow::Type::UINT8: PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break; + case arrow::Type::INT16: PrimitiveConverterImpl_ = GEN_TYPE(Int16); break; + case arrow::Type::UINT16: PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break; + case arrow::Type::INT32: PrimitiveConverterImpl_ = GEN_TYPE(Int32); break; + case arrow::Type::UINT32: PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break; + case arrow::Type::INT64: PrimitiveConverterImpl_ = GEN_TYPE(Int64); break; + case arrow::Type::UINT64: PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break; + case arrow::Type::DOUBLE: PrimitiveConverterImpl_ = GEN_TYPE(Double); break; + case arrow::Type::FLOAT: PrimitiveConverterImpl_ = GEN_TYPE(Float); break; + case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(String); break; + case arrow::Type::BINARY: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; + default: + return; // will check in runtime + }; + } + arrow::Datum Convert(std::shared_ptr block) { + return PrimitiveConverterImpl_(Settings_.Builder.get(), block); + } +private: + TYtColumnConverterSettings& Settings_; + arrow::Datum (*PrimitiveConverterImpl_)(NUdf::IArrayBuilder*, std::shared_ptr); +}; + +template +class TYtYsonColumnConverter { +public: + TYtYsonColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { + Reader_ = NUdf::MakeBlockReaderImpl>(TTypeInfoHelper(), settings.Type, settings.PgBuilder); + } + + arrow::Datum Convert(std::shared_ptr block) { + if constexpr(!IsDictionary) { + arrow::BinaryArray binary(block); + if (block->GetNullCount()) { + for (i64 i = 0; i < block->length; ++i) { + if (binary.IsNull(i)) { + Settings_.Builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(i, &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(i, &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + return Settings_.Builder->Build(false); + } + arrow::DictionaryArray dict(block); + arrow::BinaryArray binary(block->dictionary); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + Settings_.Builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(data[i], &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(data[i], &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + return Settings_.Builder->Build(false); + } + +private: + std::shared_ptr::TResult> Reader_; + TYtColumnConverterSettings& Settings_; +}; + +template +class TYtColumnConverter final : public IYtColumnConverter { +public: + TYtColumnConverter(TYtColumnConverterSettings&& settings) + : Settings_(std::move(settings)) + , DictYsonConverter_(Settings_) + , YsonConverter_(Settings_) + , DictPrimitiveConverter_(Settings_) + , PrimitiveConverter_(Settings_) {} + + arrow::Datum Convert(std::shared_ptr block) override { + if (arrow::Type::DICTIONARY == block->type->id()) { + if (static_cast(*block->type).value_type()->Equals(Settings_.ArrowType)) { + return DictPrimitiveConverter_.Convert(block); + } else { + return DictYsonConverter_.Convert(block); + } + } else { + if (block->type->Equals(Settings_.ArrowType)) { + return PrimitiveConverter_.Convert(block); + } else { + return block; + } + } + } +private: + TYtColumnConverterSettings Settings_; + TYtYsonColumnConverter DictYsonConverter_; + TYtYsonColumnConverter YsonConverter_; + TPrimitiveColumnConverter DictPrimitiveConverter_; + TPrimitiveColumnConverter PrimitiveConverter_; +}; + +TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) + : Type(type), PgBuilder(pgBuilder), Pool(pool), IsNative(isNative) +{ + if (!isNative) { + if (Type->IsOptional()) { + IsTopOptional = true; + Type = static_cast(Type)->GetItemType(); + } + } + YQL_ENSURE(ConvertArrowType(type, ArrowType), "Can't convert type to arrow"); + size_t maxBlockItemSize = CalcMaxBlockItemSize(type); + size_t maxBlockLen = CalcBlockLen(maxBlockItemSize); + Builder = std::move(NUdf::MakeArrayBuilder( + TTypeInfoHelper(), type, + pool, + maxBlockLen, + pgBuilder + )); +} + +template typename T, typename Args, bool... Acc> +struct TBoolDispatcher { + + std::unique_ptr Dispatch(Args&& args) const { + return std::make_unique>(std::forward(args)); + } + + template + auto Dispatch(Args&& args, bool head, Bools... tail) const { + return head ? + TBoolDispatcher().Dispatch(std::forward(args), tail...) : + TBoolDispatcher().Dispatch(std::forward(args), tail...); + } +}; + +std::unique_ptr MakeYtColumnConverter(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) { + TYtColumnConverterSettings settings(type, pgBuilder, pool, isNative); + bool isTopOptional = settings.IsTopOptional; + return TBoolDispatcher().Dispatch(std::move(settings), isNative, isTopOptional); +} +} diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h new file mode 100644 index 000000000000..aed8b5f311df --- /dev/null +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h @@ -0,0 +1,22 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYql::NDqs { + +class IYtColumnConverter { +public: + virtual arrow::Datum Convert(std::shared_ptr block) = 0; + virtual ~IYtColumnConverter() = default; +}; + +std::unique_ptr MakeYtColumnConverter(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative); +} diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index 923deacc08c6..d380ae4ef9f9 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -1,6 +1,7 @@ #include "dq_yt_block_reader.h" #include "stream_decoder.h" #include "dq_yt_rpc_helpers.h" +#include "arrow_converter.h" #include @@ -8,14 +9,15 @@ #include #include #include +#include #include +#include #include #include -#include -#include -#include +#include #include +#include #include #include #include @@ -30,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -45,13 +48,16 @@ namespace NYql::NDqs { using namespace NKikimr::NMiniKQL; +TStatKey FallbackCount("YtBlockReader_Fallbacks", true); +TStatKey BlockCount("YtBlockReader_Blocks", true); + namespace { struct TResultBatch { using TPtr = std::shared_ptr; size_t RowsCnt; std::vector Columns; + TResultBatch(int64_t cnt) : RowsCnt(cnt) {} TResultBatch(int64_t cnt, decltype(Columns)&& columns) : RowsCnt(cnt), Columns(std::move(columns)) {} - TResultBatch(std::shared_ptr batch) : RowsCnt(batch->num_rows()), Columns(batch->columns().begin(), batch->columns().end()) {} }; template @@ -59,27 +65,35 @@ class TBlockingQueueWithLimit { struct Poison { TString Error; }; - using TPoisonOr = std::variant; + struct TFallbackNotify {}; + using TPoisonOr = std::variant; public: TBlockingQueueWithLimit(size_t limit) : Limit_(limit) {} - void Push(T&& val) { - PushInternal(std::move(val)); + void Push(T&& val, bool imm) { + PushInternal(std::move(val), imm); } void PushPoison(const TString& err) { - PushInternal(Poison{err}); + PushInternal(Poison{err}, true); + } + + void PushNotify() { + PushInternal(TFallbackNotify{}, true); } - T Get() { + TMaybe Get() { auto res = GetInternal(); if (std::holds_alternative(res)) { throw std::runtime_error(std::get(res).Error); } + if (std::holds_alternative(res)) { + return {}; + } return std::move(std::get(res)); } private: template - void PushInternal(X&& val) { + void PushInternal(X&& val, bool imm) { NYT::TPromise promise; { std::lock_guard _(Mtx_); @@ -88,7 +102,7 @@ class TBlockingQueueWithLimit { Awaiting_.pop(); return; } - if (Ready_.size() >= Limit_) { + if (!imm && Ready_.size() >= Limit_) { promise = NYT::NewPromise(); BlockedPushes_.push(promise); } else { @@ -135,7 +149,9 @@ class TListener { public: using TPromise = NYT::TPromise; using TPtr = std::shared_ptr; - TListener(size_t initLatch, size_t inflight) : Latch_(initLatch), Queue_(inflight) {} + TListener(size_t initLatch, size_t inflight) + : Latch_(initLatch) + , Queue_(inflight) {} void OnEOF() { bool excepted = 0; @@ -149,19 +165,11 @@ class TListener { } // Handles result - void HandleResult(TResultBatch::TPtr&& res) { - Queue_.Push(std::move(res)); - } - - void OnRecordBatchDecoded(TBatchPtr record_batch) { - YQL_ENSURE(record_batch); - // decode dictionary - record_batch = NKikimr::NArrow::DictionaryToArray(record_batch); - // and handle result - HandleResult(std::make_shared(record_batch)); + void HandleResult(TResultBatch::TPtr&& res, bool immediatly = false) { + Queue_.Push(std::move(res), immediatly); } - TResultBatch::TPtr Get() { + TMaybe Get() { return Queue_.Get(); } @@ -170,7 +178,11 @@ class TListener { } void HandleFallback(TResultBatch::TPtr&& block) { - HandleResult(std::move(block)); + HandleResult(std::move(block), true); + } + + void NotifyFallback() { + Queue_.PushNotify(); } void InputDone() { @@ -267,8 +279,16 @@ class TBlockBuilder { class TLocalListener : public arrow::ipc::Listener { public: - TLocalListener(std::shared_ptr consumer) - : Consumer_(consumer) {} + TLocalListener(std::shared_ptr consumer, std::shared_ptr> columnTypes, std::shared_ptr>> arrowTypes, arrow::MemoryPool& pool, const NUdf::IPgBuilder* pgBuilder, bool isNative, NKikimr::NMiniKQL::IStatsRegistry* jobStats) + : Consumer_(consumer) + , ColumnTypes_(columnTypes) + , JobStats_(jobStats) + { + ColumnConverters_.reserve(columnTypes->size()); + for (size_t i = 0; i < columnTypes->size(); ++i) { + ColumnConverters_.emplace_back(MakeYtColumnConverter(columnTypes->at(i), pgBuilder, pool, isNative)); + } + } void Init(std::shared_ptr self) { Self_ = self; @@ -281,7 +301,14 @@ class TLocalListener : public arrow::ipc::Listener { } arrow::Status OnRecordBatchDecoded(std::shared_ptr batch) override { - Consumer_->OnRecordBatchDecoded(batch); + YQL_ENSURE(batch); + MKQL_ADD_STAT(JobStats_, BlockCount, 1); + std::vector result; + result.reserve(ColumnConverters_.size()); + for (size_t i = 0; i < ColumnConverters_.size(); ++i) { + result.emplace_back(ColumnConverters_[i]->Convert(batch->column(i)->data())); + } + Consumer_->HandleResult(std::make_shared(batch->num_rows(), std::move(result))); return arrow::Status::OK(); } @@ -296,17 +323,21 @@ class TLocalListener : public arrow::ipc::Listener { std::shared_ptr Self_; std::shared_ptr Consumer_; std::shared_ptr Decoder_; + std::shared_ptr> ColumnTypes_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; + std::vector> ColumnConverters_; }; class TSource : public TNonCopyable { public: using TPtr = std::shared_ptr; TSource(std::unique_ptr&& settings, - size_t inflight, TType* type, const THolderFactory& holderFactory) + size_t inflight, TType* type, std::shared_ptr>> types, const THolderFactory& holderFactory, NKikimr::NMiniKQL::IStatsRegistry* jobStats) : Settings_(std::move(settings)) , Inputs_(std::move(Settings_->RawInputs)) , Listener_(std::make_shared(Inputs_.size(), inflight)) , HolderFactory_(holderFactory) + , JobStats_(jobStats) { auto structType = AS_TYPE(TStructType, type); std::vector columnTypes_(structType->GetMembersCount()); @@ -319,7 +350,9 @@ class TSource : public TNonCopyable { LocalListeners_.reserve(Inputs_.size()); for (size_t i = 0; i < Inputs_.size(); ++i) { InputsQueue_.emplace(i); - LocalListeners_.emplace_back(std::make_shared(Listener_)); + auto& decoder = Settings_->Specs->Inputs[Settings_->OriginalIndexes[i]]; + bool native = decoder->NativeYtTypeFlags && !decoder->FieldsVec[i].ExplicitYson; + LocalListeners_.emplace_back(std::make_shared(Listener_, ptr, types, *Settings_->Pool, Settings_->PgBuilder, native, jobStats)); LocalListeners_.back()->Init(LocalListeners_.back()); } BlockBuilder_.Init(ptr, *Settings_->Pool, Settings_->PgBuilder); @@ -336,8 +369,9 @@ class TSource : public TNonCopyable { inputIdx = InputsQueue_.front(); InputsQueue_.pop(); } + Inputs_[inputIdx]->Read().SubscribeUnique(BIND([inputIdx = inputIdx, self = Self_](NYT::TErrorOr&& res) { - self->Pool_->GetInvoker()->Invoke(BIND([inputIdx, self, res = std::move(res)] () mutable { + self->Pool_->GetInvoker()->Invoke(BIND([inputIdx, self, res = std::move(res)]() mutable { try { self->Accept(inputIdx, std::move(res)); self->RunRead(); @@ -365,23 +399,13 @@ class TSource : public TNonCopyable { NYT::NApi::NRpcProxy::NProto::TRowsetStatistics statistics; NYT::TSharedRef currentPayload = NYT::NApi::NRpcProxy::DeserializeRowStreamBlockEnvelope(res.Value(), &descriptor, &statistics); if (descriptor.rowset_format() != NYT::NApi::NRpcProxy::NProto::RF_ARROW) { - auto promise = NYT::NewPromise>(); - MainInvoker_->Invoke(BIND([inputIdx, currentPayload, self = Self_, promise] { - try { - promise.Set(self->FallbackHandler(inputIdx, currentPayload)); - } catch (std::exception& e) { - promise.Set(NYT::TError(e.what())); - } - })); - auto result = NYT::NConcurrency::WaitFor(promise.ToFuture()); - if (!result.IsOK()) { - Listener_->HandleError(result.GetMessage()); - return; - } - for (auto& e: result.Value()) { - Listener_->HandleFallback(std::move(e)); + if (currentPayload.Size()) { + std::lock_guard _(FallbackMtx_); + Fallbacks_.push({inputIdx, currentPayload}); + } else { + InputDone(inputIdx); } - InputDone(inputIdx); + Listener_->NotifyFallback(); return; } @@ -406,18 +430,38 @@ class TSource : public TNonCopyable { } TResultBatch::TPtr Next() { - auto result = Listener_->Get(); - return result; + for(;;) { + size_t inputIdx = 0; + NYT::TSharedRef payload; + { + std::lock_guard _(FallbackMtx_); + if (Fallbacks_.size()) { + inputIdx = Fallbacks_.front().first; + payload = Fallbacks_.front().second; + Fallbacks_.pop(); + } + } + if (payload) { + for (auto &e: FallbackHandler(inputIdx, payload)) { + Listener_->HandleFallback(std::move(e)); + } + InputDone(inputIdx); + RunRead(); + } + auto result = Listener_->Get(); + if (!result) { // Falled back + continue; + } + return *result; + } } std::vector FallbackHandler(size_t idx, NYT::TSharedRef payload) { if (!payload.Size()) { return {}; } - // We're have only one mkql reader, protect it if 2 fallbacks happen at the same time - std::lock_guard _(FallbackMtx_); auto currentReader_ = std::make_shared(std::move(payload)); - + MKQL_ADD_STAT(JobStats_, FallbackCount, 1); // TODO(): save and recover row indexes FallbackReader_.SetReader(*currentReader_, 1, 4_MB, ui32(Settings_->OriginalIndexes[idx]), true); // If we don't save the reader, after exiting FallbackHandler it will be destroyed, @@ -445,9 +489,8 @@ class TSource : public TNonCopyable { } void SetSelfAndRun(TPtr self) { - MainInvoker_ = NYT::GetCurrentInvoker(); Self_ = self; - Pool_ = NYT::NConcurrency::CreateThreadPool(Inflight_, "rpc_reader_inflight"); + Pool_ = NYT::NConcurrency::CreateThreadPool(Inflight_, "block_reader"); // Run Inflight_ reads at the same time for (size_t i = 0; i < Inflight_; ++i) { RunRead(); @@ -455,10 +498,11 @@ class TSource : public TNonCopyable { } private: - NYT::IInvoker* MainInvoker_; NYT::NConcurrency::IThreadPoolPtr Pool_; std::mutex Mtx_; std::mutex FallbackMtx_; + std::queue> Fallbacks_; + std::queue FallbackBlocks_; std::unique_ptr Settings_; std::vector> LocalListeners_; std::vector Inputs_; @@ -470,26 +514,27 @@ class TSource : public TNonCopyable { TPtr Self_; size_t Inflight_; const THolderFactory& HolderFactory_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; -class TState: public TComputationValue { - using TBase = TComputationValue; +class TReaderState: public TComputationValue { + using TBase = TComputationValue; public: - TState(TMemoryUsageInfo* memInfo, TSource::TPtr source, size_t width, TType* type) + TReaderState(TMemoryUsageInfo* memInfo, TSource::TPtr source, size_t width, std::shared_ptr>> arrowTypes) : TBase(memInfo) , Source_(std::move(source)) , Width_(width) - , Type_(type) - , Types_(width) + , Types_(arrowTypes) { - for (size_t i = 0; i < Width_; ++i) { - Types_[i] = NArrow::GetArrowType(AS_TYPE(TStructType, Type_)->GetMemberType(i)); - } } - + EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + if (GotFinish_) { + return EFetchResult::Finish; + } auto batch = Source_->Next(); if (!batch) { + GotFinish_ = 1; Source_->Finish(); return EFetchResult::Finish; } @@ -497,8 +542,8 @@ class TState: public TComputationValue { if (!output[i]) { continue; } - if(!batch->Columns[i].type()->Equals(Types_[i])) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_[i]))); + if(!batch->Columns[i].type()->Equals(Types_->at(i))) { + *(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_->at(i)))); continue; } *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); @@ -512,65 +557,71 @@ class TState: public TComputationValue { private: TSource::TPtr Source_; const size_t Width_; - TType* Type_; - std::vector> Types_; + std::shared_ptr>> Types_; + bool GotFinish_ = 0; }; }; class TDqYtReadBlockWrapper : public TStatefulWideFlowComputationNode { using TBaseComputation = TStatefulWideFlowComputationNode; public: - + TDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, const TString& token, const NYT::TNode& inputSpec, const NYT::TNode& samplingSpec, const TVector& inputGroups, TType* itemType, const TVector& tableNames, TVector>&& tables, NKikimr::NMiniKQL::IStatsRegistry* jobStats, size_t inflight, size_t timeout) : TBaseComputation(ctx.Mutables, this, EValueRepresentation::Boxed) - , Width(AS_TYPE(TStructType, itemType)->GetMembersCount()) - , CodecCtx(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) - , ClusterName(clusterName) - , Token(token) - , SamplingSpec(samplingSpec) - , Tables(std::move(tables)) - , Inflight(inflight) - , Timeout(timeout) - , Type(itemType) + , Width_(AS_TYPE(TStructType, itemType)->GetMembersCount()) + , CodecCtx_(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) + , ClusterName_(clusterName) + , Token_(token) + , SamplingSpec_(samplingSpec) + , Tables_(std::move(tables)) + , Inflight_(inflight) + , Timeout_(timeout) + , Type_(itemType) + , JobStats_(jobStats) { - // TODO() Enable range indexes - Specs.SetUseSkiff("", TMkqlIOSpecs::ESystemField::RowIndex); - Specs.Init(CodecCtx, inputSpec, inputGroups, tableNames, itemType, {}, {}, jobStats); + // TODO() Enable range indexes + row indexes + Specs_.SetUseSkiff("", 0); + Specs_.Init(CodecCtx_, inputSpec, inputGroups, tableNames, itemType, {}, {}, jobStats); } void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - auto settings = CreateInputStreams(true, Token, ClusterName, Timeout, Inflight > 1, Tables, SamplingSpec); - settings->Specs = &Specs; + auto settings = CreateInputStreams(true, Token_, ClusterName_, Timeout_, Inflight_ > 1, Tables_, SamplingSpec_); + settings->Specs = &Specs_; settings->Pool = arrow::default_memory_pool(); settings->PgBuilder = &ctx.Builder->GetPgBuilder(); - auto source = std::make_shared(std::move(settings), Inflight, Type, ctx.HolderFactory); + auto types = std::make_shared>>(Width_); + for (size_t i = 0; i < Width_; ++i) { + YQL_ENSURE(ConvertArrowType(AS_TYPE(TStructType, Type_)->GetMemberType(i), types->at(i)), "Can't convert type to arrow"); + } + auto source = std::make_shared(std::move(settings), Inflight_, Type_, types, ctx.HolderFactory, JobStats_); source->SetSelfAndRun(source); - state = ctx.HolderFactory.Create(source, Width, Type); + state = ctx.HolderFactory.Create(source, Width_, types); } EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { if (!state.HasValue()) { MakeState(ctx, state); } - return static_cast(*state.AsBoxed()).FetchValues(ctx, output); + return static_cast(*state.AsBoxed()).FetchValues(ctx, output); } void RegisterDependencies() const final {} - - const ui32 Width; - NCommon::TCodecContext CodecCtx; - TMkqlIOSpecs Specs; - - TString ClusterName; - TString Token; - NYT::TNode SamplingSpec; - TVector> Tables; - size_t Inflight; - size_t Timeout; - TType* Type; +private: + const ui32 Width_; + NCommon::TCodecContext CodecCtx_; + TMkqlIOSpecs Specs_; + + TString ClusterName_; + TString Token_; + NYT::TNode SamplingSpec_; + TVector> Tables_; + size_t Inflight_; + size_t Timeout_; + TType* Type_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; IComputationNode* CreateDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp index 560168730804..b801aa8ef5d2 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp @@ -87,7 +87,8 @@ std::unique_ptr CreateInputStreams(bool isArrow, const TString& request->set_arrow_fallback_rowset_format(NYT::NApi::NRpcProxy::NProto::ERowsetFormat::RF_FORMAT); } - request->set_enable_row_index(true); + // TODO() Enable row indexes + request->set_enable_row_index(!isArrow); request->set_enable_table_index(true); // TODO() Enable range indexes request->set_enable_range_index(!isArrow); diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp index 9fd0ea839d53..d5d2076cfde5 100644 --- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp +++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp @@ -13,7 +13,8 @@ struct TTestSetup { , State(MakeIntrusive()) { State->Types = TypesCtx.Get(); - State->DqIntegration_ = CreateYtDqIntegration(State.Get()); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::IBuiltinFunctionRegistry::TPtr(nullptr)); + State->DqIntegration_ = CreateYtDqIntegration(State.Get(), functionRegistry.Get()); } diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index 3c132cc5f675..13608af4f110 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -73,6 +73,7 @@ PEERDIR( ydb/library/yql/providers/common/mkql ydb/library/yql/providers/common/proto ydb/library/yql/providers/common/activation + ydb/library/yql/providers/common/arrow_resolve ydb/library/yql/providers/common/provider ydb/library/yql/providers/common/schema/expr ydb/library/yql/providers/common/transform diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 6bded487cc27..d567fbadba43 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -39,8 +40,9 @@ using namespace NNodes; class TYtDqIntegration: public TDqIntegrationBase { public: - TYtDqIntegration(TYtState* state) + TYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry) : State_(state) + , ArrowResolver_(MakeSimpleArrowResolver(functionRegistry)) { } @@ -358,20 +360,24 @@ class TYtDqIntegration: public TDqIntegrationBase { return false; } - bool CanBlockRead(const NNodes::TExprBase& node, TExprContext&, TTypeAnnotationContext&) override { + bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&) override { auto wrap = node.Cast(); auto maybeRead = wrap.Input().Maybe(); if (!maybeRead) { return false; } - if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { return false; } const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast()->GetItems().back())->Cast(); - if (!CanBlockReadTypes(structType)) { + TVector subTypeAnn(Reserve(structType->GetItems().size())); + for (const auto& type: structType->GetItems()) { + subTypeAnn.emplace_back(type->GetItemType()); + } + + if (ArrowResolver_->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { return false; } @@ -669,11 +675,12 @@ class TYtDqIntegration: public TDqIntegrationBase { private: TYtState* State_; + IArrowResolver::TPtr ArrowResolver_; }; -THolder CreateYtDqIntegration(TYtState* state) { +THolder CreateYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) { Y_ABORT_UNLESS(state); - return MakeHolder(state); + return MakeHolder(state, *functionRegistry); } } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h index e4cfac09839b..67ca368005bd 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h @@ -3,11 +3,12 @@ #include "yql_yt_provider.h" #include +#include #include namespace NYql { -THolder CreateYtDqIntegration(TYtState* state); +THolder CreateYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp index e4d97ea43bee..d4cf3f1d5a68 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp @@ -369,7 +369,7 @@ TRuntimeNode BuildDqYtInputCall( } tablesNode.Add(refName); // TODO() Enable range indexes - auto skiffNode = SingleTableSpecToInputSkiff(specNode, structColumns, true, !enableBlockReader, false); + auto skiffNode = SingleTableSpecToInputSkiff(specNode, structColumns, !enableBlockReader, !enableBlockReader, false); const auto tmpFolder = GetTablesTmpFolder(*state->Configuration); auto tableName = pathInfo.Table->Name; if (pathInfo.Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index 0b7bd3551d7b..a1979fc5891c 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -356,7 +356,7 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat ytState->SessionId = sessionId; ytState->Gateway = gateway; ytState->Types = typeCtx.Get(); - ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); + ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get(), functionRegistry); TStatWriter statWriter = [ytState](ui32 publicId, const TVector& stat) { with_lock(ytState->StatisticsMutex) { From 62c4d4ccc28792408639757cfd2c0f5107bd4d15 Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Wed, 20 Dec 2023 10:36:09 +0000 Subject: [PATCH 2/7] .make + bool --- .../yt/comp_nodes/dq/arrow_converter.cpp | 35 +++++++++++++------ .../yql/providers/yt/comp_nodes/dq/ya.make | 3 +- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index 6a6413d72616..6dd85df185a0 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -32,18 +32,25 @@ template arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { if constexpr (!IsDictionary) { typename ::arrow::TypeTraits::ArrayType val(block); // checking for compatibility - arrow::UInt32Array w; if (val.null_count()) { for (i64 i = 0; i < block->length; ++i) { if (val.IsNull(i)) { builder->Add(NUdf::TBlockItem{}); } else { - builder->Add(NUdf::TBlockItem(val.Value(i))); + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(i))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(i))); + } } } } else { for (i64 i = 0; i < block->length; ++i) { - builder->Add(NUdf::TBlockItem(val.Value(i))); + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(i))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(i))); + } } } return builder->Build(false); @@ -56,12 +63,20 @@ arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr< if (dict.IsNull(i)) { builder->Add(NUdf::TBlockItem{}); } else { - builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i]))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } } } } else { for (i64 i = 0; i < block->length; ++i) { - builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i]))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } } } return builder->Build(false); @@ -402,6 +417,7 @@ class TPrimitiveColumnConverter { public: TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { switch (Settings_.ArrowType->id()) { + case arrow::Type::BOOL: PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break; case arrow::Type::INT8: PrimitiveConverterImpl_ = GEN_TYPE(Int8); break; case arrow::Type::UINT8: PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break; case arrow::Type::INT16: PrimitiveConverterImpl_ = GEN_TYPE(Int16); break; @@ -510,8 +526,7 @@ class TYtColumnConverter final : public IYtColumnConverter { : Settings_(std::move(settings)) , DictYsonConverter_(Settings_) , YsonConverter_(Settings_) - , DictPrimitiveConverter_(Settings_) - , PrimitiveConverter_(Settings_) {} + , DictPrimitiveConverter_(Settings_) {} arrow::Datum Convert(std::shared_ptr block) override { if (arrow::Type::DICTIONARY == block->type->id()) { @@ -522,9 +537,10 @@ class TYtColumnConverter final : public IYtColumnConverter { } } else { if (block->type->Equals(Settings_.ArrowType)) { - return PrimitiveConverter_.Convert(block); - } else { return block; + } else { + YQL_ENSURE(arrow::Type::BINARY == block->type->id()); + return YsonConverter_.Convert(block); } } } @@ -533,7 +549,6 @@ class TYtColumnConverter final : public IYtColumnConverter { TYtYsonColumnConverter DictYsonConverter_; TYtYsonColumnConverter YsonConverter_; TPrimitiveColumnConverter DictPrimitiveConverter_; - TPrimitiveColumnConverter PrimitiveConverter_; }; TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make index 48ae5d6ea10c..cba05d599a94 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make @@ -1,11 +1,11 @@ LIBRARY() PEERDIR( + ydb/library/yql/minikql ydb/library/yql/minikql/computation/llvm ydb/library/yql/providers/yt/comp_nodes ydb/library/yql/providers/yt/codec ydb/library/yql/providers/common/codec - ydb/core/formats/arrow yt/cpp/mapreduce/interface yt/cpp/mapreduce/common library/cpp/yson/node @@ -27,6 +27,7 @@ IF(LINUX) ) SRCS( + arrow_converter.cpp stream_decoder.cpp dq_yt_rpc_reader.cpp dq_yt_rpc_helpers.cpp From 4f60da1ab7cdacacf472b22093fae01800da56e5 Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Thu, 21 Dec 2023 14:21:30 +0000 Subject: [PATCH 3/7] wideflow => stream --- ydb/library/yql/providers/dq/opt/dqs_opt.cpp | 12 +++-- .../provider/yql_dq_datasource_type_ann.cpp | 2 + .../yt/comp_nodes/dq/dq_yt_block_reader.cpp | 54 ++++++++----------- 3 files changed, 32 insertions(+), 36 deletions(-) diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index d8ae81155dbd..994de78b22b5 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -92,12 +93,13 @@ namespace NYql::NDqs { } YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration"; - return Build(ctx, node->Pos()) - .Input(Build(ctx, node->Pos()) - .Input(readWideWrap.Input()) - .Flags(readWideWrap.Flags()) - .Token(readWideWrap.Token()) + .Input(Build(ctx, node->Pos()) + .Input(Build(ctx, node->Pos()) + .Input(readWideWrap.Input()) + .Flags(readWideWrap.Flags()) + .Token(readWideWrap.Token()) + .Done()) .Done()) .Done().Ptr(); }, ctx, optSettings); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp index b7e18d36a3b0..841e20f13335 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp @@ -155,6 +155,8 @@ class TDqsDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } types.push_back(ctx.MakeType(ctx.MakeType(EDataSlot::Uint64))); + input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); + return TStatus::Ok; } input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index d380ae4ef9f9..777e153e25c5 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -333,10 +333,10 @@ class TSource : public TNonCopyable { using TPtr = std::shared_ptr; TSource(std::unique_ptr&& settings, size_t inflight, TType* type, std::shared_ptr>> types, const THolderFactory& holderFactory, NKikimr::NMiniKQL::IStatsRegistry* jobStats) - : Settings_(std::move(settings)) + : HolderFactory(holderFactory) + , Settings_(std::move(settings)) , Inputs_(std::move(Settings_->RawInputs)) , Listener_(std::make_shared(Inputs_.size(), inflight)) - , HolderFactory_(holderFactory) , JobStats_(jobStats) { auto structType = AS_TYPE(TStructType, type); @@ -356,7 +356,7 @@ class TSource : public TNonCopyable { LocalListeners_.back()->Init(LocalListeners_.back()); } BlockBuilder_.Init(ptr, *Settings_->Pool, Settings_->PgBuilder); - FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory_); + FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory); } void RunRead() { @@ -471,7 +471,7 @@ class TSource : public TNonCopyable { while (FallbackReader_.IsValid()) { auto currentRow = std::move(FallbackReader_.GetRow()); if (!Settings_->Specs->InputGroups.empty()) { - currentRow = std::move(HolderFactory_.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); + currentRow = std::move(HolderFactory.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); } BlockBuilder_.Add(currentRow); FallbackReader_.Next(); @@ -497,6 +497,7 @@ class TSource : public TNonCopyable { } } + const THolderFactory& HolderFactory; private: NYT::NConcurrency::IThreadPoolPtr Pool_; std::mutex Mtx_; @@ -513,7 +514,6 @@ class TSource : public TNonCopyable { TListener::TPtr Listener_; TPtr Self_; size_t Inflight_; - const THolderFactory& HolderFactory_; NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; @@ -525,52 +525,48 @@ class TReaderState: public TComputationValue { , Source_(std::move(source)) , Width_(width) , Types_(arrowTypes) + , Result_(width) { } - EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { if (GotFinish_) { - return EFetchResult::Finish; + return NUdf::EFetchStatus::Finish; } + YQL_ENSURE(width == Width_ + 1); auto batch = Source_->Next(); if (!batch) { GotFinish_ = 1; Source_->Finish(); - return EFetchResult::Finish; + return NUdf::EFetchStatus::Finish; } + for (size_t i = 0; i < Width_; ++i) { - if (!output[i]) { - continue; - } - if(!batch->Columns[i].type()->Equals(Types_->at(i))) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_->at(i)))); - continue; - } - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); - } - if (output[Width_]) { - *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + YQL_ENSURE(batch->Columns[i].type()->Equals(Types_->at(i))); + output[i] = Source_->HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); } - return EFetchResult::One; + output[Width_] = Source_->HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + return NUdf::EFetchStatus::Ok; } private: TSource::TPtr Source_; const size_t Width_; std::shared_ptr>> Types_; + std::vector Result_; bool GotFinish_ = 0; }; }; -class TDqYtReadBlockWrapper : public TStatefulWideFlowComputationNode { -using TBaseComputation = TStatefulWideFlowComputationNode; +class TDqYtReadBlockWrapper : public TMutableComputationNode { +using TBaseComputation = TMutableComputationNode; public: TDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, const TString& token, const NYT::TNode& inputSpec, const NYT::TNode& samplingSpec, const TVector& inputGroups, TType* itemType, const TVector& tableNames, TVector>&& tables, NKikimr::NMiniKQL::IStatsRegistry* jobStats, size_t inflight, - size_t timeout) : TBaseComputation(ctx.Mutables, this, EValueRepresentation::Boxed) + size_t timeout) : TBaseComputation(ctx.Mutables, EValueRepresentation::Boxed) , Width_(AS_TYPE(TStructType, itemType)->GetMembersCount()) , CodecCtx_(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) , ClusterName_(clusterName) @@ -588,6 +584,9 @@ using TBaseComputation = TStatefulWideFlowComputationNode 1, Tables_, SamplingSpec_); settings->Specs = &Specs_; settings->Pool = arrow::default_memory_pool(); @@ -598,14 +597,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode(std::move(settings), Inflight_, Type_, types, ctx.HolderFactory, JobStats_); source->SetSelfAndRun(source); - state = ctx.HolderFactory.Create(source, Width_, types); - } - - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!state.HasValue()) { - MakeState(ctx, state); - } - return static_cast(*state.AsBoxed()).FetchValues(ctx, output); + return ctx.HolderFactory.Create(source, Width_, types); } void RegisterDependencies() const final {} From 3682034d8da83ddb292932d91700173f2348220a Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Fri, 22 Dec 2023 10:56:20 +0000 Subject: [PATCH 4/7] yson fix --- .../minikql/computation/mkql_block_reader.cpp | 8 +- .../computation/mkql_block_transport.cpp | 4 +- ydb/library/yql/minikql/mkql_type_builder.cpp | 4 +- .../yt/comp_nodes/dq/arrow_converter.cpp | 90 ++++++++++++++++--- .../yql/public/udf/arrow/block_reader.h | 21 +++-- 5 files changed, 99 insertions(+), 28 deletions(-) diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index 88073f208b93..baa0c3f43622 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -184,7 +184,7 @@ struct TConverterTraits { using TTuple = TTupleBlockItemConverter; template using TFixedSize = TFixedSizeBlockItemConverter; - template + template using TStrings = TStringBlockItemConverter; using TExtOptional = TExternalOptionalBlockItemConverter; @@ -193,15 +193,15 @@ struct TConverterTraits { return std::make_unique>(); } else { if (desc.Typelen == -1) { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } else if (desc.Typelen == -2) { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } else { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index e7697f0c12db..99126496899e 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -500,7 +500,7 @@ struct TSerializerTraits { using TTuple = TTupleBlockSerializer; template using TFixedSize = TFixedSizeBlockSerializer; - template + template using TStrings = TStringBlockSerializer; using TExtOptional = TExtOptionalBlockSerializer; @@ -519,7 +519,7 @@ struct TDeserializerTraits { using TTuple = TTupleBlockDeserializer; template using TFixedSize = TFixedSizeBlockDeserializer; - template + template using TStrings = TStringBlockDeserializer; using TExtOptional = TExtOptionalBlockDeserializer; diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index 1aaf4cdb203f..a3db083b8f71 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -2425,7 +2425,7 @@ struct TComparatorTraits { using TTuple = NUdf::TTupleBlockItemComparator; template using TFixedSize = NUdf::TFixedSizeBlockItemComparator; - template + template using TStrings = NUdf::TStringBlockItemComparator; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; @@ -2441,7 +2441,7 @@ struct THasherTraits { using TTuple = NUdf::TTupleBlockItemHasher; template using TFixedSize = NUdf::TFixedSizeBlockItemHasher; - template + template using TStrings = NUdf::TStringBlockItemHasher; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index 6dd85df185a0..c728e6e8fe3e 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -205,6 +205,69 @@ class TYsonReaderDetails { size_t Available_; }; +namespace { +void SkipYson(TYsonReaderDetails& buf) { + switch (buf.Current()) { + case BeginListSymbol: { + buf.Next(); + for (;;) { + SkipYson(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + if (buf.Current() == EndListSymbol) { + break; + } + } + buf.Next(); + break; + } + case BeginMapSymbol: { + buf.Next(); + for (;;) { + SkipYson(buf); + YQL_ENSURE(buf.Current() == KeyValueSeparatorSymbol); + buf.Next(); + SkipYson(buf); + if (buf.Current() == KeyedItemSeparatorSymbol) { + buf.Next(); + } + if (buf.Current() == EndMapSymbol) { + break; + } + } + buf.Next(); + break; + } + case StringMarker: + buf.Next(); + buf.Skip(buf.ReadVarI32()); + break; + case Uint64Marker: + case Int64Marker: + buf.Next(); + Y_UNUSED(buf.ReadVarI64()); + break; + case TrueMarker: + case FalseMarker: + buf.Next(); + break; + case DoubleMarker: + buf.Next(); + Y_UNUSED(buf.NextDouble()); + break; + default: + YQL_ENSURE(false, "Unexpected char: " + std::string{buf.Current()}); + } +} + +NUdf::TBlockItem ReadYson(TYsonReaderDetails& buf) { + const char* beg = buf.Data(); + SkipYson(buf); + return NUdf::TBlockItem(std::string_view(beg, buf.Data() - beg)); +} +}; + class IYsonBlockReader { public: virtual NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) = 0; @@ -272,7 +335,7 @@ class TYsonTupleBlockReader final : public IYsonBlockReaderWithNativeFlag Items_; }; -template +template class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag { public: NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { @@ -281,13 +344,18 @@ class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag> Children_; @@ -300,7 +368,7 @@ struct TYtColumnConverterSettings { NKikimr::NMiniKQL::TType* Type; const NUdf::IPgBuilder* PgBuilder; arrow::MemoryPool& Pool; - bool IsNative; + const bool IsNative; bool IsTopOptional = false; std::shared_ptr ArrowType; std::unique_ptr Builder; @@ -397,8 +465,8 @@ struct TYsonBlockReaderTraits { using TTuple = TYsonTupleBlockReader; template using TFixedSize = TYsonFixedSizeBlockReader; - template - using TStrings = TYsonStringBlockReader; + template + using TStrings = TYsonStringBlockReader; using TExtOptional = TYsonExternalOptBlockReader; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { @@ -406,7 +474,7 @@ struct TYsonBlockReaderTraits { if (desc.PassByValue) { return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index 6b8200e50523..fb78f0c5dd54 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -97,7 +97,7 @@ class TFixedSizeBlockReader final : public IBlockReader { } }; -template +template class TStringBlockReader final : public IBlockReader { public: using TOffset = typename TStringType::offset_type; @@ -364,8 +364,8 @@ struct TReaderTraits { using TTuple = TTupleBlockReader; template using TFixedSize = TFixedSizeBlockReader; - template - using TStrings = TStringBlockReader; + template + using TStrings = TStringBlockReader; using TExtOptional = TExternalOptionalBlockReader; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { @@ -373,7 +373,7 @@ struct TReaderTraits { if (desc.PassByValue) { return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } }; @@ -396,12 +396,12 @@ std::unique_ptr MakeFixedSizeBlockReaderImpl(bool isO } } -template +template std::unique_ptr MakeStringBlockReaderImpl(bool isOptional) { if (isOptional) { - return std::make_unique>(); + return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } @@ -489,12 +489,15 @@ std::unique_ptr MakeBlockReaderImpl(const ITypeInfoHe case NUdf::EDataSlot::Double: return MakeFixedSizeBlockReaderImpl(isOptional); case NUdf::EDataSlot::String: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Yson: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::JsonDocument: - return MakeStringBlockReaderImpl(isOptional); + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Utf8: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Json: - return MakeStringBlockReaderImpl(isOptional); + return MakeStringBlockReaderImpl(isOptional); default: Y_ENSURE(false, "Unsupported data slot"); } From 30e69ac09d4ee9227a8f2b4ec8902bd20e6dc253 Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Fri, 22 Dec 2023 16:31:10 +0000 Subject: [PATCH 5/7] fix --- .../yt/comp_nodes/dq/arrow_converter.cpp | 104 ++++++------------ 1 file changed, 32 insertions(+), 72 deletions(-) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index c728e6e8fe3e..98961195376a 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -23,38 +23,13 @@ struct TypeHelper { }; #define GEN_TYPE(type)\ - NumericConverterImpl + NumericConverterImpl #define GEN_TYPE_STR(type)\ - StringConverterImpl + StringConverterImpl -template +template arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { - if constexpr (!IsDictionary) { - typename ::arrow::TypeTraits::ArrayType val(block); // checking for compatibility - if (val.null_count()) { - for (i64 i = 0; i < block->length; ++i) { - if (val.IsNull(i)) { - builder->Add(NUdf::TBlockItem{}); - } else { - if constexpr (std::is_same_v) { - builder->Add(NUdf::TBlockItem((ui8)val.Value(i))); - } else { - builder->Add(NUdf::TBlockItem(val.Value(i))); - } - } - } - } else { - for (i64 i = 0; i < block->length; ++i) { - if constexpr (std::is_same_v) { - builder->Add(NUdf::TBlockItem((ui8)val.Value(i))); - } else { - builder->Add(NUdf::TBlockItem(val.Value(i))); - } - } - } - return builder->Build(false); - } arrow::DictionaryArray dict(block); typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); auto data = dict.indices()->data()->GetValues(1); @@ -82,29 +57,8 @@ arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr< return builder->Build(false); } -template +template arrow::Datum StringConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { - if constexpr (!IsDictionary) { - typename ::arrow::TypeTraits::ArrayType val(block); // checking for compatibility - if (val.null_count()) { - for (i64 i = 0; i < block->length; ++i) { - if (val.IsNull(i)) { - builder->Add(NUdf::TBlockItem{}); - } else { - i32 len; - auto ptr = reinterpret_cast(val.GetValue(i, &len)); - builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); - } - } - } else { - for (i64 i = 0; i < block->length; ++i) { - i32 len; - auto ptr = reinterpret_cast(val.GetValue(i, &len)); - builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); - } - } - return builder->Build(false); - } arrow::DictionaryArray dict(block); typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); auto data = dict.indices()->data()->GetValues(1); @@ -222,7 +176,9 @@ void SkipYson(TYsonReaderDetails& buf) { buf.Next(); break; } + case BeginAttributesSymbol: case BeginMapSymbol: { + auto originalEnd = buf.Current() == BeginMapSymbol ? EndMapSymbol : EndAttributesSymbol; buf.Next(); for (;;) { SkipYson(buf); @@ -232,7 +188,7 @@ void SkipYson(TYsonReaderDetails& buf) { if (buf.Current() == KeyedItemSeparatorSymbol) { buf.Next(); } - if (buf.Current() == EndMapSymbol) { + if (buf.Current() == originalEnd) { break; } } @@ -369,7 +325,7 @@ struct TYtColumnConverterSettings { const NUdf::IPgBuilder* PgBuilder; arrow::MemoryPool& Pool; const bool IsNative; - bool IsTopOptional = false; + const bool IsTopOptional; std::shared_ptr ArrowType; std::unique_ptr Builder; }; @@ -484,26 +440,31 @@ template class TPrimitiveColumnConverter { public: TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { - switch (Settings_.ArrowType->id()) { - case arrow::Type::BOOL: PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break; - case arrow::Type::INT8: PrimitiveConverterImpl_ = GEN_TYPE(Int8); break; - case arrow::Type::UINT8: PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break; - case arrow::Type::INT16: PrimitiveConverterImpl_ = GEN_TYPE(Int16); break; - case arrow::Type::UINT16: PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break; - case arrow::Type::INT32: PrimitiveConverterImpl_ = GEN_TYPE(Int32); break; - case arrow::Type::UINT32: PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break; - case arrow::Type::INT64: PrimitiveConverterImpl_ = GEN_TYPE(Int64); break; - case arrow::Type::UINT64: PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break; - case arrow::Type::DOUBLE: PrimitiveConverterImpl_ = GEN_TYPE(Double); break; - case arrow::Type::FLOAT: PrimitiveConverterImpl_ = GEN_TYPE(Float); break; - case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(String); break; - case arrow::Type::BINARY: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; - default: - return; // will check in runtime - }; + if constexpr (IsDictionary) { + switch (Settings_.ArrowType->id()) { + case arrow::Type::BOOL: PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break; + case arrow::Type::INT8: PrimitiveConverterImpl_ = GEN_TYPE(Int8); break; + case arrow::Type::UINT8: PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break; + case arrow::Type::INT16: PrimitiveConverterImpl_ = GEN_TYPE(Int16); break; + case arrow::Type::UINT16: PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break; + case arrow::Type::INT32: PrimitiveConverterImpl_ = GEN_TYPE(Int32); break; + case arrow::Type::UINT32: PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break; + case arrow::Type::INT64: PrimitiveConverterImpl_ = GEN_TYPE(Int64); break; + case arrow::Type::UINT64: PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break; + case arrow::Type::DOUBLE: PrimitiveConverterImpl_ = GEN_TYPE(Double); break; + case arrow::Type::FLOAT: PrimitiveConverterImpl_ = GEN_TYPE(Float); break; + case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(String); break; + case arrow::Type::BINARY: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; + default: + return; // will check in runtime + }; + } } arrow::Datum Convert(std::shared_ptr block) { - return PrimitiveConverterImpl_(Settings_.Builder.get(), block); + if constexpr (IsDictionary) { + return PrimitiveConverterImpl_(Settings_.Builder.get(), block); + } + return block; } private: TYtColumnConverterSettings& Settings_; @@ -620,11 +581,10 @@ class TYtColumnConverter final : public IYtColumnConverter { }; TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) - : Type(type), PgBuilder(pgBuilder), Pool(pool), IsNative(isNative) + : Type(type), PgBuilder(pgBuilder), Pool(pool), IsNative(isNative), IsTopOptional(!isNative && type->IsOptional()) { if (!isNative) { if (Type->IsOptional()) { - IsTopOptional = true; Type = static_cast(Type)->GetItemType(); } } From ca1f264b64a6061cbcfb7486cd583b4243778de1 Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Mon, 25 Dec 2023 11:25:33 +0000 Subject: [PATCH 6/7] fix --- .../yql/minikql/computation/mkql_block_reader.cpp | 2 +- .../yql/minikql/computation/mkql_block_transport.cpp | 4 ++-- ydb/library/yql/minikql/mkql_type_builder.cpp | 4 ++-- .../yt/provider/ut/yql_yt_dq_integration_ut.cpp | 3 +-- .../providers/yt/provider/yql_yt_dq_integration.cpp | 10 ++++------ .../providers/yt/provider/yql_yt_dq_integration.h | 2 +- .../yql/providers/yt/provider/yql_yt_provider.cpp | 2 +- ydb/library/yql/public/udf/arrow/block_reader.h | 12 ++++++------ 8 files changed, 18 insertions(+), 21 deletions(-) diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index baa0c3f43622..1c92c389a6e5 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -184,7 +184,7 @@ struct TConverterTraits { using TTuple = TTupleBlockItemConverter; template using TFixedSize = TFixedSizeBlockItemConverter; - template + template using TStrings = TStringBlockItemConverter; using TExtOptional = TExternalOptionalBlockItemConverter; diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index 99126496899e..6cebab459452 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -500,7 +500,7 @@ struct TSerializerTraits { using TTuple = TTupleBlockSerializer; template using TFixedSize = TFixedSizeBlockSerializer; - template + template using TStrings = TStringBlockSerializer; using TExtOptional = TExtOptionalBlockSerializer; @@ -519,7 +519,7 @@ struct TDeserializerTraits { using TTuple = TTupleBlockDeserializer; template using TFixedSize = TFixedSizeBlockDeserializer; - template + template using TStrings = TStringBlockDeserializer; using TExtOptional = TExtOptionalBlockDeserializer; diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index a3db083b8f71..f74999bc42fe 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -2425,7 +2425,7 @@ struct TComparatorTraits { using TTuple = NUdf::TTupleBlockItemComparator; template using TFixedSize = NUdf::TFixedSizeBlockItemComparator; - template + template using TStrings = NUdf::TStringBlockItemComparator; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; @@ -2441,7 +2441,7 @@ struct THasherTraits { using TTuple = NUdf::TTupleBlockItemHasher; template using TFixedSize = NUdf::TFixedSizeBlockItemHasher; - template + template using TStrings = NUdf::TStringBlockItemHasher; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; diff --git a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp index d5d2076cfde5..9fd0ea839d53 100644 --- a/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp +++ b/ydb/library/yql/providers/yt/provider/ut/yql_yt_dq_integration_ut.cpp @@ -13,8 +13,7 @@ struct TTestSetup { , State(MakeIntrusive()) { State->Types = TypesCtx.Get(); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::IBuiltinFunctionRegistry::TPtr(nullptr)); - State->DqIntegration_ = CreateYtDqIntegration(State.Get(), functionRegistry.Get()); + State->DqIntegration_ = CreateYtDqIntegration(State.Get()); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index d567fbadba43..135bfca1a179 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -40,9 +40,8 @@ using namespace NNodes; class TYtDqIntegration: public TDqIntegrationBase { public: - TYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry) + TYtDqIntegration(TYtState* state) : State_(state) - , ArrowResolver_(MakeSimpleArrowResolver(functionRegistry)) { } @@ -377,7 +376,7 @@ class TYtDqIntegration: public TDqIntegrationBase { subTypeAnn.emplace_back(type->GetItemType()); } - if (ArrowResolver_->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { + if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { return false; } @@ -675,12 +674,11 @@ class TYtDqIntegration: public TDqIntegrationBase { private: TYtState* State_; - IArrowResolver::TPtr ArrowResolver_; }; -THolder CreateYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry) { +THolder CreateYtDqIntegration(TYtState* state) { Y_ABORT_UNLESS(state); - return MakeHolder(state, *functionRegistry); + return MakeHolder(state); } } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h index 67ca368005bd..737d29b7b0ee 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.h @@ -9,6 +9,6 @@ namespace NYql { -THolder CreateYtDqIntegration(TYtState* state, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry); +THolder CreateYtDqIntegration(TYtState* state); } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp index a1979fc5891c..0b7bd3551d7b 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_provider.cpp @@ -356,7 +356,7 @@ TDataProviderInitializer GetYtNativeDataProviderInitializer(IYtGateway::TPtr gat ytState->SessionId = sessionId; ytState->Gateway = gateway; ytState->Types = typeCtx.Get(); - ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get(), functionRegistry); + ytState->DqIntegration_ = CreateYtDqIntegration(ytState.Get()); TStatWriter statWriter = [ytState](ui32 publicId, const TVector& stat) { with_lock(ytState->StatisticsMutex) { diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index fb78f0c5dd54..3985a44ac271 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -97,7 +97,7 @@ class TFixedSizeBlockReader final : public IBlockReader { } }; -template +template class TStringBlockReader final : public IBlockReader { public: using TOffset = typename TStringType::offset_type; @@ -364,8 +364,8 @@ struct TReaderTraits { using TTuple = TTupleBlockReader; template using TFixedSize = TFixedSizeBlockReader; - template - using TStrings = TStringBlockReader; + template + using TStrings = TStringBlockReader; using TExtOptional = TExternalOptionalBlockReader; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { @@ -396,12 +396,12 @@ std::unique_ptr MakeFixedSizeBlockReaderImpl(bool isO } } -template +template std::unique_ptr MakeStringBlockReaderImpl(bool isOptional) { if (isOptional) { - return std::make_unique>(); + return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } From e91aeb800c0fb2b0eae2fd5bce362291bbbcdaca Mon Sep 17 00:00:00 2001 From: MrLolthe1st Date: Mon, 25 Dec 2023 11:53:08 +0000 Subject: [PATCH 7/7] remove unused --- ydb/library/yql/providers/yt/provider/ya.make | 1 - ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp | 1 - 2 files changed, 2 deletions(-) diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index 13608af4f110..3c132cc5f675 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -73,7 +73,6 @@ PEERDIR( ydb/library/yql/providers/common/mkql ydb/library/yql/providers/common/proto ydb/library/yql/providers/common/activation - ydb/library/yql/providers/common/arrow_resolve ydb/library/yql/providers/common/provider ydb/library/yql/providers/common/schema/expr ydb/library/yql/providers/common/transform diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index 135bfca1a179..11e5b0490191 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include #include