Skip to content

Commit

Permalink
yson fix
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st committed Dec 22, 2023
1 parent 4f60da1 commit 648164a
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 28 deletions.
8 changes: 4 additions & 4 deletions ydb/library/yql/minikql/computation/mkql_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ struct TConverterTraits {
using TTuple = TTupleBlockItemConverter<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = TFixedSizeBlockItemConverter<T, Nullable>;
template <typename TStringType, bool Nullable, NUdf::EPgStringType PgString = NUdf::EPgStringType::None>
template <typename TStringType, bool Nullable, NUdf::EDataSlot OriginalT = NUdf::EDataSlot::String, NUdf::EPgStringType PgString = NUdf::EPgStringType::None>
using TStrings = TStringBlockItemConverter<TStringType, Nullable, PgString>;
using TExtOptional = TExternalOptionalBlockItemConverter;

Expand All @@ -193,15 +193,15 @@ struct TConverterTraits {
return std::make_unique<TFixedSize<ui64, true>>();
} else {
if (desc.Typelen == -1) {
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::Text>>();
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::Text>>();
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
return ret;
} else if (desc.Typelen == -2) {
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::CString>>();
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::CString>>();
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
return ret;
} else {
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EPgStringType::Fixed>>();
auto ret = std::make_unique<TStrings<arrow::BinaryType, true, NUdf::EDataSlot::String, NUdf::EPgStringType::Fixed>>();
ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen);
return ret;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/minikql/computation/mkql_block_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ struct TSerializerTraits {
using TTuple = TTupleBlockSerializer<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = TFixedSizeBlockSerializer<sizeof(T), Nullable>;
template <typename TStringType, bool Nullable>
template <typename TStringType, bool Nullable, NUdf::EDataSlot OriginalT = NUdf::EDataSlot::String>
using TStrings = TStringBlockSerializer<TStringType, Nullable>;
using TExtOptional = TExtOptionalBlockSerializer;

Expand All @@ -519,7 +519,7 @@ struct TDeserializerTraits {
using TTuple = TTupleBlockDeserializer<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = TFixedSizeBlockDeserializer<sizeof(T), Nullable>;
template <typename TStringType, bool Nullable>
template <typename TStringType, bool Nullable, NUdf::EDataSlot OriginalT = NUdf::EDataSlot::String>
using TStrings = TStringBlockDeserializer<TStringType, Nullable>;
using TExtOptional = TExtOptionalBlockDeserializer;

Expand Down
4 changes: 2 additions & 2 deletions ydb/library/yql/minikql/mkql_type_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2425,7 +2425,7 @@ struct TComparatorTraits {
using TTuple = NUdf::TTupleBlockItemComparator<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = NUdf::TFixedSizeBlockItemComparator<T, Nullable>;
template <typename TStringType, bool Nullable>
template <typename TStringType, bool Nullable, NUdf::EDataSlot OriginalT = NUdf::EDataSlot::String>
using TStrings = NUdf::TStringBlockItemComparator<TStringType, Nullable>;
using TExtOptional = NUdf::TExternalOptionalBlockItemComparator;

Expand All @@ -2441,7 +2441,7 @@ struct THasherTraits {
using TTuple = NUdf::TTupleBlockItemHasher<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = NUdf::TFixedSizeBlockItemHasher<T, Nullable>;
template <typename TStringType, bool Nullable>
template <typename TStringType, bool Nullable, NUdf::EDataSlot OriginalT = NUdf::EDataSlot::String>
using TStrings = NUdf::TStringBlockItemHasher<TStringType, Nullable>;
using TExtOptional = NUdf::TExternalOptionalBlockItemHasher;

Expand Down
90 changes: 79 additions & 11 deletions ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,69 @@ class TYsonReaderDetails {
size_t Available_;
};

namespace {
void SkipYson(TYsonReaderDetails& buf) {
switch (buf.Current()) {
case BeginListSymbol: {
buf.Next();
for (;;) {
SkipYson(buf);
if (buf.Current() == ListItemSeparatorSymbol) {
buf.Next();
}
if (buf.Current() == EndListSymbol) {
break;
}
}
buf.Next();
break;
}
case BeginMapSymbol: {
buf.Next();
for (;;) {
SkipYson(buf);
YQL_ENSURE(buf.Current() == KeyValueSeparatorSymbol);
buf.Next();
SkipYson(buf);
if (buf.Current() == KeyedItemSeparatorSymbol) {
buf.Next();
}
if (buf.Current() == EndMapSymbol) {
break;
}
}
buf.Next();
break;
}
case StringMarker:
buf.Next();
buf.Skip(buf.ReadVarI32());
break;
case Uint64Marker:
case Int64Marker:
buf.Next();
Y_UNUSED(buf.ReadVarI64());
break;
case TrueMarker:
case FalseMarker:
buf.Next();
break;
case DoubleMarker:
buf.Next();
Y_UNUSED(buf.NextDouble());
break;
default:
YQL_ENSURE(false, "Unexcepted char: " + std::string{buf.Current()});
}
}

NUdf::TBlockItem ReadYson(TYsonReaderDetails& buf) {
const char* beg = buf.Data();
SkipYson(buf);
return NUdf::TBlockItem(std::string_view(beg, buf.Data() - beg));
}
};

class IYsonBlockReader {
public:
virtual NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) = 0;
Expand Down Expand Up @@ -272,7 +335,7 @@ class TYsonTupleBlockReader final : public IYsonBlockReaderWithNativeFlag<Native
TVector<NUdf::TBlockItem> Items_;
};

template<typename T, bool Nullable, bool Native>
template<typename T, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT, bool Native>
class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag<Native> {
public:
NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final {
Expand All @@ -281,13 +344,18 @@ class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag<Nativ
}
return GetNotNull(buf);
}

NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final {
YQL_ENSURE(buf.Current() == StringMarker);
buf.Next();
const i32 length = buf.ReadVarI32();
auto res = NUdf::TBlockItem(NUdf::TStringRef(buf.Data(), length));
buf.Skip(length);
return res;
if constexpr (NUdf::EDataSlot::Yson != OriginalT) {
YQL_ENSURE(buf.Current() == StringMarker);
buf.Next();
const i32 length = buf.ReadVarI32();
auto res = NUdf::TBlockItem(NUdf::TStringRef(buf.Data(), length));
buf.Skip(length);
return res;
} else {
return ReadYson(buf);
}
}
private:
const TVector<std::unique_ptr<IYsonBlockReader>> Children_;
Expand All @@ -300,7 +368,7 @@ struct TYtColumnConverterSettings {
NKikimr::NMiniKQL::TType* Type;
const NUdf::IPgBuilder* PgBuilder;
arrow::MemoryPool& Pool;
bool IsNative;
const bool IsNative;
bool IsTopOptional = false;
std::shared_ptr<arrow::DataType> ArrowType;
std::unique_ptr<NKikimr::NUdf::IArrayBuilder> Builder;
Expand Down Expand Up @@ -397,16 +465,16 @@ struct TYsonBlockReaderTraits {
using TTuple = TYsonTupleBlockReader<Nullable, Native>;
template <typename T, bool Nullable>
using TFixedSize = TYsonFixedSizeBlockReader<T, Nullable, Native>;
template <typename TStringType, bool Nullable>
using TStrings = TYsonStringBlockReader<TStringType, Nullable, Native>;
template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT>
using TStrings = TYsonStringBlockReader<TStringType, Nullable, OriginalT, Native>;
using TExtOptional = TYsonExternalOptBlockReader<Native>;

static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
return std::make_unique<TFixedSize<ui64, true>>();
} else {
return std::make_unique<TStrings<arrow::BinaryType, true>>();
return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>();
}
}

Expand Down
21 changes: 12 additions & 9 deletions ydb/library/yql/public/udf/arrow/block_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TFixedSizeBlockReader final : public IBlockReader {
}
};

template<typename TStringType, bool Nullable>
template<typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT = NKikimr::NUdf::EDataSlot::String>
class TStringBlockReader final : public IBlockReader {
public:
using TOffset = typename TStringType::offset_type;
Expand Down Expand Up @@ -364,16 +364,16 @@ struct TReaderTraits {
using TTuple = TTupleBlockReader<Nullable>;
template <typename T, bool Nullable>
using TFixedSize = TFixedSizeBlockReader<T, Nullable>;
template <typename TStringType, bool Nullable>
using TStrings = TStringBlockReader<TStringType, Nullable>;
template <typename TStringType, bool Nullable, NKikimr::NUdf::EDataSlot OriginalT>
using TStrings = TStringBlockReader<TStringType, Nullable, OriginalT>;
using TExtOptional = TExternalOptionalBlockReader;

static std::unique_ptr<TResult> MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) {
Y_UNUSED(pgBuilder);
if (desc.PassByValue) {
return std::make_unique<TFixedSize<ui64, true>>();
} else {
return std::make_unique<TStrings<arrow::BinaryType, true>>();
return std::make_unique<TStrings<arrow::BinaryType, true, NKikimr::NUdf::EDataSlot::String>>();
}
}
};
Expand All @@ -396,12 +396,12 @@ std::unique_ptr<typename TTraits::TResult> MakeFixedSizeBlockReaderImpl(bool isO
}
}

template <typename TTraits, typename T>
template <typename TTraits, typename T, NKikimr::NUdf::EDataSlot OriginalT>
std::unique_ptr<typename TTraits::TResult> MakeStringBlockReaderImpl(bool isOptional) {
if (isOptional) {
return std::make_unique<typename TTraits::template TStrings<T, true>>();
return std::make_unique<typename TTraits::template TStrings<T, true, OriginalT>>();
} else {
return std::make_unique<typename TTraits::template TStrings<T, false>>();
return std::make_unique<typename TTraits::template TStrings<T, false, OriginalT>>();
}
}

Expand Down Expand Up @@ -489,12 +489,15 @@ std::unique_ptr<typename TTraits::TResult> MakeBlockReaderImpl(const ITypeInfoHe
case NUdf::EDataSlot::Double:
return MakeFixedSizeBlockReaderImpl<TTraits, double>(isOptional);
case NUdf::EDataSlot::String:
return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::String>(isOptional);
case NUdf::EDataSlot::Yson:
return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::Yson>(isOptional);
case NUdf::EDataSlot::JsonDocument:
return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType>(isOptional);
return MakeStringBlockReaderImpl<TTraits, arrow::BinaryType, NUdf::EDataSlot::JsonDocument>(isOptional);
case NUdf::EDataSlot::Utf8:
return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Utf8>(isOptional);
case NUdf::EDataSlot::Json:
return MakeStringBlockReaderImpl<TTraits, arrow::StringType>(isOptional);
return MakeStringBlockReaderImpl<TTraits, arrow::StringType, NUdf::EDataSlot::Json>(isOptional);
default:
Y_ENSURE(false, "Unsupported data slot");
}
Expand Down

0 comments on commit 648164a

Please sign in to comment.