From 79dba10c64cd7d50cdd3d384b8dbe2003bffb44e Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 23 Oct 2024 11:26:37 +0000 Subject: [PATCH 1/6] Passed unboxed values from parser to filter --- .../fq/libs/row_dispatcher/json_filter.cpp | 88 +++--- ydb/core/fq/libs/row_dispatcher/json_filter.h | 5 +- .../fq/libs/row_dispatcher/json_parser.cpp | 253 ++++++++++++------ ydb/core/fq/libs/row_dispatcher/json_parser.h | 5 +- .../fq/libs/row_dispatcher/topic_session.cpp | 14 +- .../libs/row_dispatcher/ut/json_filter_ut.cpp | 85 ++++-- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 172 +++++++----- .../row_dispatcher/ut/topic_session_ut.cpp | 9 +- .../pq/provider/yql_pq_dq_integration.cpp | 3 +- ydb/tests/fq/yds/test_row_dispatcher.py | 10 +- 10 files changed, 409 insertions(+), 235 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 165612121d54..c4f1e3dd2436 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -10,6 +11,8 @@ #include #include +#include + namespace { @@ -23,6 +26,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) { .Add(fieldType); } +NYT::TNode CreateOptionalTypeNode(const TString& fieldType) { + return NYT::TNode::CreateList() + .Add("OptionalType") + .Add(CreateTypeNode(fieldType)); +} + void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { node.Add( NYT::TNode::CreateList() @@ -31,18 +40,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy ); } -void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { - node.Add(NYT::TNode::CreateList() - .Add(fieldName) - .Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType))) +void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) { + NYT::TNode parsedType; + Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type"); + + // TODO: remove this when the re-parsing is removed from pq read actor + if (parsedType == CreateTypeNode("Json")) { + parsedType = CreateTypeNode("String"); + } else if (parsedType == CreateOptionalTypeNode("Json")) { + parsedType = CreateOptionalTypeNode("String"); + } + + node.Add( + NYT::TNode::CreateList() + .Add(fieldName) + .Add(parsedType) ); } -NYT::TNode MakeInputSchema(const TVector& columns) { +NYT::TNode MakeInputSchema(const TVector& columns, const TVector& types) { auto structMembers = NYT::TNode::CreateList(); AddField(structMembers, OffsetFieldName, "Uint64"); - for (const auto& col : columns) { - AddOptionalField(structMembers, col, "String"); + for (size_t i = 0; i < columns.size(); ++i) { + AddTypedField(structMembers, columns[i], types[i]); } return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); } @@ -68,7 +88,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector>&>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector&>> { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -106,7 +126,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer&, const TVector>&> values) override { + void OnObject(std::pair&, const TVector&> values) override { Y_ENSURE(FieldsPositions.size() == values.second.size()); NKikimr::NMiniKQL::TThrowingBindTerminator bind; @@ -114,7 +134,7 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumerGetGraph().GetHolderFactory(); // TODO: use blocks here - for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) { + for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) { NYql::NUdf::TUnboxedValue* items = nullptr; NYql::NUdf::TUnboxedValue result = Cache.NewArray( @@ -126,13 +146,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumerat(rowId); } Worker->Push(std::move(result)); } + + // Clear cache after each object because + // values allocated on another allocator and should be released + Cache.Clear(); + Worker->GetGraph().Invalidate(); } } @@ -216,7 +239,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder&, const TVector>&>>>; + using TConsumerType = THolder&, const TVector&>>>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -244,12 +267,15 @@ class TJsonFilter::TImpl { const TVector& types, const TString& whereFilter, TCallback callback) - : Sql(GenerateSql(columns, types, whereFilter)) { + : Sql(GenerateSql(whereFilter)) { + Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions()); + // Program should be stateless because input values + // allocated on another allocator and should be released LOG_ROW_DISPATCHER_DEBUG("Creating program..."); Program = factory->MakePushStreamProgram( - TFilterInputSpec(MakeInputSchema(columns)), + TFilterInputSpec(MakeInputSchema(columns, types)), TFilterOutputSpec(MakeOutputSchema()), Sql, NYql::NPureCalc::ETranslationMode::SQL @@ -258,7 +284,7 @@ class TJsonFilter::TImpl { LOG_ROW_DISPATCHER_DEBUG("Program created"); } - void Push(const TVector& offsets, const TVector>& values) { + void Push(const TVector& offsets, const TVector& values) { Y_ENSURE(values, "Expected non empty schema"); InputConsumer->OnObject(std::make_pair(offsets, values)); } @@ -268,29 +294,9 @@ class TJsonFilter::TImpl { } private: - TString GenerateSql(const TVector& columnNames, const TVector& columnTypes, const TString& whereFilter) { + TString GenerateSql(const TString& whereFilter) { TStringStream str; - str << "$fields = SELECT "; - Y_ABORT_UNLESS(columnNames.size() == columnTypes.size()); - str << OffsetFieldName << ", "; - for (size_t i = 0; i < columnNames.size(); ++i) { - TString columnType = columnTypes[i]; - TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`'); - if (columnType == "Json") { - columnType = "String"; - } else if (columnType == "Optional") { - columnType = "Optional"; - } - - if (columnType.StartsWith("Optional")) { - str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)"; - } else { - str << "Unwrap(CAST(" << columnName << " as " << columnType << "))"; - } - str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : ""); - } - str << " FROM Input;\n"; - str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n"; + str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n"; str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName; str << "\"])))) as data FROM $filtered"; @@ -300,7 +306,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder&, const TVector>&>>> InputConsumer; + THolder&, const TVector&>>> InputConsumer; const TString Sql; }; @@ -315,7 +321,7 @@ TJsonFilter::TJsonFilter( TJsonFilter::~TJsonFilter() { } -void TJsonFilter::Push(const TVector& offsets, const TVector>& values) { +void TJsonFilter::Push(const TVector& offsets, const TVector& values) { Impl->Push(offsets, values); } diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index f3435763ce3e..c4435bd9bab7 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,7 +1,6 @@ #pragma once -#include -#include +#include namespace NFq { @@ -18,7 +17,7 @@ class TJsonFilter { ~TJsonFilter(); - void Push(const TVector& offsets, const TVector>& values); + void Push(const TVector& offsets, const TVector& values); TString GetSql(); private: diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 14c807fc0226..2afcc8dbf77d 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -2,6 +2,13 @@ #include +#include +#include +#include +#include +#include +#include + #include #include @@ -25,7 +32,7 @@ struct TJsonParserBuffer { } void Reserve(size_t size, size_t numberValues) { - Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING)); + Values.reserve(size + simdjson::SIMDJSON_PADDING); Offsets.reserve(numberValues); } @@ -45,18 +52,10 @@ struct TJsonParserBuffer { } } - std::string_view AddHolder(std::string_view value) { - Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders"); - const size_t startPos = Values.size(); - Values << value; - return std::string_view(Values).substr(startPos, value.length()); - } - std::pair Finish() { Y_ENSURE(!Finished, "Cannot finish buffer twice"); Finished = true; Values << TString(simdjson::SIMDJSON_PADDING, ' '); - Values.reserve(2 * Values.size()); return {Values.data(), Values.size()}; } @@ -82,24 +81,33 @@ namespace NFq { class TJsonParser::TImpl { struct TColumnDescription { std::string Name; - TString Type; + TString TypeYson; + NKikimr::NMiniKQL::TType* Type; }; public: TImpl(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) - : BatchSize(batchSize) + : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) + , TypeEnv(Alloc) + , BatchSize(batchSize) , BatchCreationTimeout(batchCreationTimeout) , ParsedValues(columns.size()) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); - Columns.reserve(columns.size()); - for (size_t i = 0; i < columns.size(); i++) { - Columns.emplace_back(TColumnDescription{ - .Name = columns[i], - .Type = SkipOptional(types[i]) - }); + with_lock (Alloc) { + auto functonRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {}); + NKikimr::NMiniKQL::TProgramBuilder programBuilder(TypeEnv, *functonRegistry); + + Columns.reserve(columns.size()); + for (size_t i = 0; i < columns.size(); i++) { + Columns.emplace_back(TColumnDescription{ + .Name = columns[i], + .TypeYson = types[i], + .Type = NYql::NCommon::ParseTypeFromYson(TStringBuf(types[i]), programBuilder, Cerr) + }); + } } ColumnsIndex.reserve(columns.size()); @@ -138,98 +146,183 @@ class TJsonParser::TImpl { Buffer.AddMessages(messages); } - const TVector>& Parse() { + const TVector& Parse() { Y_ENSURE(Buffer.IsReady(), "Nothing to parse"); const auto [values, size] = Buffer.Finish(); LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); - for (auto& parsedColumn : ParsedValues) { - parsedColumn.clear(); - parsedColumn.reserve(Buffer.NumberValues); - } - - size_t rowId = 0; - simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE); - for (auto document : documents) { - for (auto item : document.get_object()) { - const auto it = ColumnsIndex.find(item.escaped_key().value()); - if (it == ColumnsIndex.end()) { - continue; - } + with_lock (Alloc) { + for (auto& parsedColumn : ParsedValues) { + parsedColumn.clear(); + parsedColumn.reserve(Buffer.NumberValues); + } - const auto& column = Columns[it->second]; - - std::string_view value; - if (item.value().is_null()) { - // TODO: support optional types and create UV - continue; - } else if (column.Type == "Json") { - value = item.value().raw_json().value(); - } else if (column.Type == "String" || column.Type == "Utf8") { - value = item.value().get_string().value(); - } else if (item.value().is_scalar()) { - // TODO: perform type validation and create UV - value = item.value().raw_json_token().value(); - } else { - throw yexception() << "Failed to parse json string, expected scalar type for column '" << it->first << "' with type " << column.Type << " but got nested json, please change column type to Json."; + size_t rowId = 0; + simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); + for (auto document : documents) { + for (auto item : document.get_object()) { + const auto it = ColumnsIndex.find(item.escaped_key().value()); + if (it == ColumnsIndex.end()) { + continue; + } + + const TColumnDescription& columnDesc = Columns[it->second]; + auto& parsedColumn = ParsedValues[it->second]; + ResizeColumn(columnDesc, parsedColumn, rowId); + + try { + parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value())); + } catch (...) { + throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage(); + } } + rowId++; + } + Y_ENSURE(rowId == Buffer.NumberValues, "Unexpected number of json documents"); - auto& parsedColumn = ParsedValues[it->second]; - parsedColumn.resize(rowId); - parsedColumn.emplace_back(CreateHolderIfNeeded(values, size, value)); + for (size_t i = 0; i < Columns.size(); ++i) { + ResizeColumn(Columns[i], ParsedValues[i], Buffer.NumberValues); } - rowId++; } - Y_ENSURE(rowId == Buffer.NumberValues, "Unexpected number of json documents"); - for (auto& parsedColumn : ParsedValues) { - parsedColumn.resize(Buffer.NumberValues); - } return ParsedValues; } TString GetDescription() const { TStringBuilder description = TStringBuilder() << "Columns: "; for (const auto& column : Columns) { - description << "'" << column.Name << "':" << column.Type << " "; + description << "'" << column.Name << "':" << column.TypeYson << " "; } description << "\nNumber values in buffer: " << Buffer.NumberValues << ", buffer size: " << Buffer.GetSize() << ", finished: " << Buffer.Finished; return description; } - TString GetDebugString(const TVector>& parsedValues) const { - TStringBuilder result; - for (size_t i = 0; i < Columns.size(); ++i) { - result << "Parsed column '" << Columns[i].Name << "': "; - for (const auto& value : parsedValues[i]) { - result << "'" << value << "' "; + ~TImpl() { + Alloc.Acquire(); + } + +private: + void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const { + if (columnDesc.Type->IsOptional()) { + parsedColumn.resize(size); + } else if (Y_UNLIKELY(parsedColumn.size() < size)) { + throw yexception() << "Failed to parse json string, found missing value at offset " << Buffer.Offsets[parsedColumn.size()] << " in non optional column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson; + } + } + + NYql::NUdf::TUnboxedValue ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { + switch (type->GetKind()) { + case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { + const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); + if (const auto dataSlot = dataType->GetDataSlot()) { + return ParseJsonValue(*dataSlot, jsonValue); + } + throw yexception() << "unsupported data type with id " << dataType->GetSchemeType(); + } + + case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { + if (jsonValue.is_null()) { + return NYql::NUdf::TUnboxedValuePod(); + } + return ParseJsonValue(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType(), jsonValue).MakeOptional(); + } + + default: { + throw yexception() << "unsupported type kind " << type->GetKindAsStr(); + } + } + } + + NYql::NUdf::TUnboxedValue ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { + const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); + switch (jsonValue.type()) { + case simdjson::fallback::ondemand::json_type::number: { + try { + switch (dataSlot) { + case NYql::NUdf::EDataSlot::Int8: + return ParseJsonNumber(jsonValue.get_int64().value()); + case NYql::NUdf::EDataSlot::Int16: + return ParseJsonNumber(jsonValue.get_int64().value()); + case NYql::NUdf::EDataSlot::Int32: + return ParseJsonNumber(jsonValue.get_int64().value()); + case NYql::NUdf::EDataSlot::Int64: + return NYql::NUdf::TUnboxedValuePod(jsonValue.get_int64().value()); + + case NYql::NUdf::EDataSlot::Uint8: + return ParseJsonNumber(jsonValue.get_uint64().value()); + case NYql::NUdf::EDataSlot::Uint16: + return ParseJsonNumber(jsonValue.get_uint64().value()); + case NYql::NUdf::EDataSlot::Uint32: + return ParseJsonNumber(jsonValue.get_uint64().value()); + case NYql::NUdf::EDataSlot::Uint64: + return NYql::NUdf::TUnboxedValuePod(jsonValue.get_uint64().value()); + + case NYql::NUdf::EDataSlot::Double: + return NYql::NUdf::TUnboxedValuePod(jsonValue.get_double().value()); + case NYql::NUdf::EDataSlot::Float: + return NYql::NUdf::TUnboxedValuePod(static_cast(jsonValue.get_double().value())); + + default: + throw yexception() << "number value is not expected for data type " << typeInfo.Name; + } + } catch (...) { + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "'), error: " << CurrentExceptionMessage(); + } + } + + case simdjson::fallback::ondemand::json_type::string: { + const auto rawString = jsonValue.get_string().value(); + if (NYql::NUdf::TUnboxedValuePod result = NKikimr::NMiniKQL::ValueFromString(dataSlot, rawString)) { + return result; + } + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; + } + + case simdjson::fallback::ondemand::json_type::array: + case simdjson::fallback::ondemand::json_type::object: { + const auto rawJson = jsonValue.raw_json().value(); + if (dataSlot != NYql::NUdf::EDataSlot::Json) { + throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " <(diff) < size) { - return value; + template + static NYql::NUdf::TUnboxedValue ParseJsonNumber(TJsonNumber number) { + if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { + throw yexception() << "number is out of range"; } - return Buffer.AddHolder(value); + return NYql::NUdf::TUnboxedValuePod(static_cast(number)); } - static TString SkipOptional(const TString& type) { - if (type.StartsWith("Optional")) { - TStringBuf optionalType = type; - Y_ENSURE(optionalType.SkipPrefix("Optional<"), "Unexpected type"); - Y_ENSURE(optionalType.ChopSuffix(">"), "Unexpected type"); - return TString(optionalType); + static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { + if (rawString.size() <= maxSize) { + return TString(rawString); } - return type; + return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; } private: + NKikimr::NMiniKQL::TScopedAlloc Alloc; + NKikimr::NMiniKQL::TTypeEnvironment TypeEnv; + const ui64 BatchSize; const TDuration BatchCreationTimeout; TVector Columns; @@ -238,7 +331,7 @@ class TJsonParser::TImpl { TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector> ParsedValues; + TVector ParsedValues; }; TJsonParser::TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) @@ -268,7 +361,7 @@ const TVector& TJsonParser::GetOffsets() const { return Impl->GetOffsets(); } -const TVector>& TJsonParser::Parse() { +const TVector& TJsonParser::Parse() { return Impl->Parse(); } @@ -276,10 +369,6 @@ TString TJsonParser::GetDescription() const { return Impl->GetDescription(); } -TString TJsonParser::GetDebugString(const TVector>& parsedValues) const { - return Impl->GetDebugString(parsedValues); -} - std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) { return std::unique_ptr(new TJsonParser(columns, types, batchSize, batchCreationTimeout)); } diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index 4f5f2b14e3a2..878fa534f015 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NFq { @@ -15,10 +17,9 @@ class TJsonParser { const TVector& GetOffsets() const; void AddMessages(const TVector& messages); - const TVector>& Parse(); + const TVector& Parse(); TString GetDescription() const; - TString GetDebugString(const TVector>& parsedValues) const; private: class TImpl; diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 204e6ff60bda..a985beffad7d 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -189,7 +189,7 @@ class TTopicSession : public TActorBootstrapped { void SubscribeOnNextEvent(); void SendToParsing(const TVector& messages); void DoParsing(bool force = false); - void DoFiltering(const TVector& offsets, const TVector>& parsedValues); + void DoFiltering(const TVector& offsets, const TVector& parsedValues); void SendData(ClientsInfo& info); void UpdateParser(); void FatalError(const TString& message, const std::unique_ptr* filter = nullptr); @@ -215,7 +215,7 @@ class TTopicSession : public TActorBootstrapped { void SendStatistic(); void SendSessionError(NActors::TActorId readActorId, const TString& message); - TVector> RebuildJson(const ClientsInfo& info, const TVector>& parsedValues); + TVector RebuildJson(const ClientsInfo& info, const TVector& parsedValues); void UpdateParserSchema(const TParserInputType& inputType); void UpdateFieldsIds(ClientsInfo& clientInfo); @@ -387,15 +387,15 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -TVector> TTopicSession::RebuildJson(const ClientsInfo& info, const TVector>& parsedValues) { - TVector> result; +TVector TTopicSession::RebuildJson(const ClientsInfo& info, const TVector& parsedValues) { + TVector result; const auto& offsets = ParserSchema.FieldsMap; result.reserve(info.FieldsIds.size()); for (auto fieldId : info.FieldsIds) { Y_ENSURE(fieldId < offsets.size(), "fieldId " << fieldId << ", offsets.size() " << offsets.size()); auto offset = offsets[fieldId]; Y_ENSURE(offset < parsedValues.size(), "offset " << offset << ", jsonBatch.size() " << parsedValues.size()); - result.push_back(parsedValues[offset]); + result.push_back(&parsedValues[offset]); } return result; } @@ -584,9 +584,9 @@ void TTopicSession::DoParsing(bool force) { } } -void TTopicSession::DoFiltering(const TVector& offsets, const TVector>& parsedValues) { +void TTopicSession::DoFiltering(const TVector& offsets, const TVector& parsedValues) { Y_ENSURE(parsedValues, "Expected non empty schema"); - LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets.front() << ", last offset: " << offsets.back() << ", data:\n" << Parser->GetDebugString(parsedValues)); + LOG_ROW_DISPATCHER_TRACE("SendToFiltering, first offset: " << offsets.front() << ", last offset: " << offsets.back()); for (auto& [actorId, info] : Clients) { try { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 1e2befea3778..83d20a37ecf1 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -8,6 +8,9 @@ #include #include #include + +#include + #include namespace { @@ -19,7 +22,9 @@ class TFixture : public NUnitTest::TBaseFixture { public: TFixture() - : Runtime(true) {} + : Runtime(true) + , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) + {} void SetUp(NUnitTest::TTestContext&) override { TAutoPtr app = new TAppPrepare(); @@ -30,6 +35,9 @@ class TFixture : public NUnitTest::TBaseFixture { } void TearDown(NUnitTest::TTestContext& /* context */) override { + with_lock (Alloc) { + Holders.clear(); + } Filter.reset(); } @@ -45,9 +53,43 @@ class TFixture : public NUnitTest::TBaseFixture { callback); } + const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(size_t size, std::function valueCreator) { + with_lock (Alloc) { + Holders.emplace_front(); + for (size_t i = 0; i < size; ++i) { + Holders.front().emplace_back(valueCreator(i)); + } + return &Holders.front(); + } + } + + template + const NKikimr::NMiniKQL::TUnboxedValueVector* MakeVector(const TVector& values, bool optional = false) { + return MakeVector(values.size(), [&](size_t i) { + NYql::NUdf::TUnboxedValuePod unboxedValue = NYql::NUdf::TUnboxedValuePod(values[i]); + return optional ? unboxedValue.MakeOptional() : unboxedValue; + }); + } + + const NKikimr::NMiniKQL::TUnboxedValueVector* MakeStringVector(const TVector& values, bool optional = false) { + return MakeVector(values.size(), [&](size_t i) { + NYql::NUdf::TUnboxedValuePod stringValue = NKikimr::NMiniKQL::MakeString(values[i]); + return optional ? stringValue.MakeOptional() : stringValue; + }); + } + + const NKikimr::NMiniKQL::TUnboxedValueVector* MakeEmptyVector(size_t size) { + return MakeVector(size, [&](size_t) { + return NYql::NUdf::TUnboxedValuePod(); + }); + } + TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Filter; + + NKikimr::NMiniKQL::TScopedAlloc Alloc; + TList Holders; }; Y_UNIT_TEST_SUITE(TJsonFilterTests) { @@ -55,13 +97,13 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { TMap result; MakeFilter( {"a1", "a2", "a@3"}, - {"String", "UInt64", "Optional"}, + {"[DataType; String]", "[DataType; Uint64]", "[OptionalType; [DataType; String]]"}, "where a2 > 100", [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {{"hello1"}, {"99"}, {"zapuskaem"}}); - Filter->Push({6}, {{"hello2"}, {"101"}, {"gusya"}}); + Filter->Push({5}, {MakeStringVector({"hello1"}), MakeVector({99}), MakeStringVector({"zapuskaem"}, true)}); + Filter->Push({6}, {MakeStringVector({"hello2"}), MakeVector({101}), MakeStringVector({"gusya"}, true)}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101,"a@3":"gusya"})", result[6]); } @@ -70,55 +112,46 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { TMap result; MakeFilter( {"a2", "a1"}, - {"UInt64", "String"}, + {"[DataType; Uint64]", "[DataType; String]"}, "where a2 > 100", [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {{"99"}, {"hello1"}}); - Filter->Push({6}, {{"101"}, {"hello2"}}); + Filter->Push({5}, {MakeVector({99}), MakeStringVector({"hello1"})}); + Filter->Push({6}, {MakeVector({101}), MakeStringVector({"hello2"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({7}, {{"102"}, {std::string_view()}}), yexception, "Failed to unwrap empty optional"); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({8}, {{"str"}, {"hello3"}}), yexception, "Failed to unwrap empty optional"); } Y_UNIT_TEST_F(ManyValues, TFixture) { TMap result; MakeFilter( - {"a1", "a2"}, - {"String", "UInt64"}, + {"a1", "a2", "a3"}, + {"[DataType; String]", "[DataType; Uint64]", "[DataType; String]"}, "where a2 > 100", [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5, 6}, {{"hello1", "hello2"}, {"99", "101"}}); - UNIT_ASSERT_VALUES_EQUAL(1, result.size()); - UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; + for (ui64 i = 0; i < 5; ++i) { + Filter->Push({2 * i, 2 * i + 1}, {MakeStringVector({"hello1", "hello2"}), MakeVector({99, 101}), MakeStringVector({largeString, largeString})}); + UNIT_ASSERT_VALUES_EQUAL_C(i + 1, result.size(), i); + UNIT_ASSERT_VALUES_EQUAL_C(TStringBuilder() << "{\"a1\":\"hello2\",\"a2\":101,\"a3\":\"" << largeString << "\"}", result[2 * i + 1], i); + } } Y_UNIT_TEST_F(NullValues, TFixture) { TMap result; MakeFilter( {"a1", "a2"}, - {"Optional", "String"}, + {"[OptionalType; [DataType; Uint64]]", "[DataType; String]"}, "where a1 is null", [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push({5}, {{std::string_view()}, {"str"}}); + Filter->Push({5}, {MakeEmptyVector(1), MakeStringVector({"str"})}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":null,"a2":"str"})", result[5]); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({5}, {{"hello1"}, {"str"}}), yexception, "Failed to unwrap empty optional"); - } - - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { - MakeFilter( - {"a1", "a2"}, - {"String", "UInt64"}, - "where Unwrap(a2) = 1", - [&](ui64, const TString&) { }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push({5}, {{"99"}, {"hello1"}}), yexception, "Failed to unwrap empty optional"); } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 28242a1ebc74..28227e0e8591 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -40,23 +40,15 @@ class TFixture : public NUnitTest::TBaseFixture { } void MakeParser(TVector columns) { - MakeParser(columns, TVector(columns.size(), "String")); + MakeParser(columns, TVector(columns.size(), "[DataType; String]")); } - void PushToParser(ui64 offset, const TString& data) { + const TVector& PushToParser(ui64 offset, const TString& data) { Parser->AddMessages({GetMessage(offset, data)}); - ParsedValues = Parser->Parse(); - ResultNumberValues = ParsedValues ? ParsedValues.front().size() : 0; - } - - TVector GetParsedRow(size_t id) const { - TVector result; - result.reserve(ParsedValues.size()); - for (const auto& columnResult : ParsedValues) { - result.emplace_back(columnResult[id]); - } - return result; + const auto& parsedValues = Parser->Parse(); + ResultNumberValues = parsedValues ? parsedValues.front().size() : 0; + return parsedValues; } static NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage GetMessage(ui64 offset, const TString& data) { @@ -67,54 +59,67 @@ class TFixture : public NUnitTest::TBaseFixture { TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; - ui64 ResultNumberValues = 0; - TVector> ParsedValues; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(Simple1, TFixture) { - MakeParser({"a1", "a2"}, {"String", "Optional"}); - PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + MakeParser({"a1", "a2"}, {"[DataType; String]", "[OptionalType; [DataType; Uint64]]"}); + const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": 101, "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); - UNIT_ASSERT_VALUES_EQUAL("101", result.back()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(101, result[1][0].GetOptionalValue().Get()); } Y_UNIT_TEST_F(Simple2, TFixture) { MakeParser({"a2", "a1"}); - PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + const auto& result = PushToParser(42,R"({"a1": "hello1", "a2": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", result.front()); - UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); } Y_UNIT_TEST_F(Simple3, TFixture) { MakeParser({"a1", "a2"}); - PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + const auto& result = PushToParser(42,R"({"a2": "hello1", "a1": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("101", result.front()); - UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); } Y_UNIT_TEST_F(Simple4, TFixture) { MakeParser({"a2", "a1"}); - PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + const auto& result = PushToParser(42, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); - const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); - UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); - UNIT_ASSERT_VALUES_EQUAL("101", result.back()); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("101", TString(result[1][0].AsStringRef())); + } + + Y_UNIT_TEST_F(LargeStrings, TFixture) { + MakeParser({"col"}); + + const TString largeString = "abcdefghjkl1234567890+abcdefghjkl1234567890"; + const TString jsonString = TStringBuilder() << "{\"col\": \"" << largeString << "\"}"; + Parser->AddMessages({ + GetMessage(42, jsonString), + GetMessage(43, jsonString) + }); + + const auto& result = Parser->Parse(); + ResultNumberValues = result.front().size(); + UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); + + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL(largeString, TString(result[0][1].AsStringRef())); } Y_UNIT_TEST_F(ManyValues, TFixture) { @@ -126,68 +131,107 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { GetMessage(44, R"({"a2": "101", "a1": "hello1", "event": "event3"})") }); - ParsedValues = Parser->Parse(); - ResultNumberValues = ParsedValues.front().size(); + const auto& result = Parser->Parse(); + ResultNumberValues = result.front().size(); UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < ResultNumberValues; ++i) { - const auto& result = GetParsedRow(i); - UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); - UNIT_ASSERT_VALUES_EQUAL_C("hello1", result.front(), i); - UNIT_ASSERT_VALUES_EQUAL_C("101", result.back(), i); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(result[0][i].AsStringRef()), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", TString(result[1][i].AsStringRef()), i); } } Y_UNIT_TEST_F(MissingFields, TFixture) { - MakeParser({"a1", "a2"}); + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[OptionalType; [DataType; Uint64]]"}); Parser->AddMessages({ - GetMessage(42, R"({"a1": "hello1", "a2": "101", "event": "event1"})"), + GetMessage(42, R"({"a1": "hello1", "a2": 101 , "event": "event1"})"), GetMessage(43, R"({"a1": "hello1", "event": "event2"})"), GetMessage(44, R"({"a2": "101", "a1": null, "event": "event3"})") }); - ParsedValues = Parser->Parse(); - ResultNumberValues = ParsedValues.front().size(); + const auto& result = Parser->Parse(); + ResultNumberValues = result.front().size(); UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); for (size_t i = 0; i < ResultNumberValues; ++i) { - const auto& result = GetParsedRow(i); - UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); - UNIT_ASSERT_VALUES_EQUAL_C(i != 2 ? "hello1" : "", result.front(), i); - UNIT_ASSERT_VALUES_EQUAL_C(i != 1 ? "101" : "", result.back(), i); + if (i == 2) { + UNIT_ASSERT_C(!result[0][i], i); + } else { + NYql::NUdf::TUnboxedValue value = result[0][i].GetOptionalValue(); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", TString(value.AsStringRef()), i); + } + if (i == 1) { + UNIT_ASSERT_C(!result[1][i], i); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(101, result[1][i].GetOptionalValue().Get(), i); + } } } Y_UNIT_TEST_F(NestedTypes, TFixture) { - MakeParser({"nested", "a1"}, {"Optional", "String"}); + MakeParser({"nested", "a1"}, {"[OptionalType; [DataType; Json]]", "[DataType; String]"}); Parser->AddMessages({ GetMessage(42, R"({"a1": "hello1", "nested": {"key": "value"}})"), - GetMessage(43, R"({"a1": "hello1", "nested": ["key1", "key2"]})") + GetMessage(43, R"({"a1": "hello2", "nested": ["key1", "key2"]})") }); - ParsedValues = Parser->Parse(); - ResultNumberValues = ParsedValues.front().size(); + const auto& result = Parser->Parse(); + ResultNumberValues = result.front().size(); UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); - const auto& nestedJson = GetParsedRow(0); - UNIT_ASSERT_VALUES_EQUAL(2, nestedJson.size()); - UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", nestedJson.front()); - UNIT_ASSERT_VALUES_EQUAL("hello1", nestedJson.back()); + UNIT_ASSERT_VALUES_EQUAL(2, result.size()); + UNIT_ASSERT_VALUES_EQUAL("{\"key\": \"value\"}", TString(result[0][0].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello1", TString(result[1][0].AsStringRef())); + + UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", TString(result[0][1].AsStringRef())); + UNIT_ASSERT_VALUES_EQUAL("hello2", TString(result[1][1].AsStringRef())); + } + + Y_UNIT_TEST_F(SimpleBooleans, TFixture) { + MakeParser({"a"}, {"[DataType; Bool]"}); + Parser->AddMessages({ + GetMessage(42, R"({"a": true})"), + GetMessage(43, R"({"a": false})") + }); + + const auto& result = Parser->Parse(); + ResultNumberValues = result.front().size(); + UNIT_ASSERT_VALUES_EQUAL(2, ResultNumberValues); + + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(true, result[0][0].Get()); + UNIT_ASSERT_VALUES_EQUAL(false, result[0][1].Get()); + } + + Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) { + MakeParser({"a1", "a2"}, {"[DataType; String]", "[DataType; Uint64]"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "hello1", "a2": null, "event": "event1"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint64], description: (yexception) found unexpected null value, expected non optional data type Uint64"); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": 105, "event": "event1"})"), yexception, "Failed to parse json string, found missing value at offset 42 in non optional column 'a1' with type [DataType; String]"); + } + + Y_UNIT_TEST_F(TypeKindsValidation, TFixture) { + MakeParser({"a2", "a1"}, {"[OptionalType; [DataType; String]]", "[ListType; [DataType; String]]"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": "hello1", "a1": ["key", "value"]})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [ListType; [DataType; String]], description: (yexception) unsupported type kind List"); + } - const auto& nestedList = GetParsedRow(1); - UNIT_ASSERT_VALUES_EQUAL(2, nestedList.size()); - UNIT_ASSERT_VALUES_EQUAL("[\"key1\", \"key2\"]", nestedList.front()); - UNIT_ASSERT_VALUES_EQUAL("hello1", nestedList.back()); + Y_UNIT_TEST_F(NumbersValidation, TFixture) { + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[DataType; Uint8]"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": 456, "a2": 42})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [OptionalType; [DataType; String]], description: (yexception) failed to parse data type String from json number (raw: '456'), error: (yexception) number value is not expected for data type String"); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "456", "a2": -42})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint8], description: (yexception) failed to parse data type Uint8 from json number (raw: '-42'), error: (simdjson::simdjson_error) INCORRECT_TYPE: The JSON element does not have the requested type."); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "str", "a2": 99999})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint8], description: (yexception) failed to parse data type Uint8 from json number (raw: '99999'), error: (yexception) number is out of range"); } - Y_UNIT_TEST_F(StringTypeValidation, TFixture) { - MakeParser({"a1"}, {"String"}); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": 1234})"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); + Y_UNIT_TEST_F(NestedJsonValidation, TFixture) { + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; Json]]", "[OptionalType; [DataType; String]]"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": {"key": "value"}, "a2": {"key2": "value2"}})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [OptionalType; [DataType; String]], description: (yexception) found unexpected nested value (raw: '{\"key2\": \"value2\"}'), expected data type String, please use Json type for nested values"); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": {"key" "value"}, "a2": "str"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [OptionalType; [DataType; Json]], description: (simdjson::simdjson_error) TAPE_ERROR: The JSON document has an improper structure: missing or superfluous commas, braces, missing keys, etc."); } - Y_UNIT_TEST_F(JsonTypeValidation, TFixture) { - MakeParser({"a1"}, {"Int32"}); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": {"key": "value"}})"), yexception, "Failed to parse json string, expected scalar type for column 'a1' with type Int32 but got nested json, please change column type to Json."); + Y_UNIT_TEST_F(BoolsValidation, TFixture) { + MakeParser({"a1", "a2"}, {"[OptionalType; [DataType; String]]", "[DataType; Bool]"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": true, "a2": false})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [OptionalType; [DataType; String]], description: (yexception) found unexpected bool value, expected data type String"); } Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 75b4ab84e648..e52f18d9a2e8 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -100,8 +100,8 @@ class TFixture : public NUnitTest::TBaseFixture { settings.SetDatabase(GetDefaultPqDatabase()); settings.AddColumns("dt"); settings.AddColumns("value"); - settings.AddColumnTypes("Uint64"); - settings.AddColumnTypes("String"); + settings.AddColumnTypes("[DataType; Uint64]"); + settings.AddColumnTypes("[DataType; String]"); if (!emptyPredicate) { settings.SetPredicate("WHERE true"); } @@ -387,7 +387,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto source1 = BuildSource(topicName); auto source2 = BuildSource(topicName); source2.AddColumns("field1"); - source2.AddColumnTypes("String"); + source2.AddColumnTypes("[DataType; String]"); StartSession(ReadActorId1, source1); StartSession(ReadActorId2, source2); @@ -395,7 +395,6 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { TString json1 = "{\"dt\":101,\"value\":\"value1\", \"field1\":\"field1\"}"; TString json2 = "{\"dt\":102,\"value\":\"value2\", \"field1\":\"field2\"}"; - Sleep(TDuration::Seconds(3)); PQWrite({ json1, json2 }, topicName); ExpectNewDataArrived({ReadActorId1, ReadActorId2}); ExpectMessageBatch(ReadActorId1, { "{\"dt\":101,\"value\":\"value1\"}", "{\"dt\":102,\"value\":\"value2\"}" }); @@ -403,7 +402,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { auto source3 = BuildSource(topicName); source3.AddColumns("field2"); - source3.AddColumnTypes("String"); + source3.AddColumnTypes("[DataType; String]"); auto readActorId3 = Runtime.AllocateEdgeActor(); StartSession(readActorId3, source3); diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index 530bda256dc0..f643c08e5876 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -257,7 +258,7 @@ class TPqDqIntegration: public TDqIntegrationBase { const auto rowSchema = topic.RowSpec().Ref().GetTypeAnn()->Cast()->GetType()->Cast(); for (const auto& item : rowSchema->GetItems()) { srcDesc.AddColumns(TString(item->GetName())); - srcDesc.AddColumnTypes(FormatType(item->GetItemType())); + srcDesc.AddColumnTypes(NCommon::WriteTypeToYson(item->GetItemType(), NYT::NYson::EYsonFormat::Text)); } NYql::NConnector::NApi::TPredicate predicateProto; diff --git a/ydb/tests/fq/yds/test_row_dispatcher.py b/ydb/tests/fq/yds/test_row_dispatcher.py index 002ff5f59465..61826b7f6581 100644 --- a/ydb/tests/fq/yds/test_row_dispatcher.py +++ b/ydb/tests/fq/yds/test_row_dispatcher.py @@ -233,15 +233,17 @@ def test_nested_types(self, kikimr, client): query_id = start_yds_query(kikimr, client, sql) wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1) + large_string = "abcdefghjkl1234567890+abcdefghjkl1234567890" data = [ - '{"time": 101, "data": {"key": "value"}, "event": "event1"}', - '{"time": 102, "data": ["key1", "key2"], "event": "event2"}', + '{"time": 101, "data": {"key": "value", "second_key":"' + large_string + '"}, "event": "event1"}', + '{"time": 102, "data": ["key1", "key2", "' + large_string + '"], "event": "event2"}', + '{"time": 103, "data": ["' + large_string + '"], "event": "event3"}', ] self.write_stream(data) expected = [ - '{"key": "value"}', - '["key1", "key2"]' + '{"key": "value", "second_key":"' + large_string + '"}', + '["key1", "key2", "' + large_string + '"]' ] assert self.read_stream(len(expected), topic_path=self.output_topic) == expected From 569893a5f3b202862532ee629bbeed0d2b887416 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 23 Oct 2024 11:48:07 +0000 Subject: [PATCH 2/6] Removed unused include --- ydb/core/fq/libs/row_dispatcher/json_filter.cpp | 3 --- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 4 +++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index c4f1e3dd2436..fbc89e816b47 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -11,9 +11,6 @@ #include #include -#include - - namespace { using TCallback = NFq::TJsonFilter::TCallback; diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 2afcc8dbf77d..69a592f7fe19 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -179,7 +179,9 @@ class TJsonParser::TImpl { } rowId++; } - Y_ENSURE(rowId == Buffer.NumberValues, "Unexpected number of json documents"); + if (rowId < Buffer.NumberValues) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows but got " << rowId; + } for (size_t i = 0; i < Columns.size(); ++i) { ResizeColumn(Columns[i], ParsedValues[i], Buffer.NumberValues); From 21b4991909dbd61aeff3bee85e4e79b7dc44fe89 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 23 Oct 2024 16:35:39 +0000 Subject: [PATCH 3/6] Added ref locking --- .../fq/libs/row_dispatcher/json_parser.cpp | 23 +++++++++++++------ .../libs/row_dispatcher/ut/json_filter_ut.cpp | 6 +++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 69a592f7fe19..a0e79e7b02b0 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -153,10 +153,7 @@ class TJsonParser::TImpl { LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); with_lock (Alloc) { - for (auto& parsedColumn : ParsedValues) { - parsedColumn.clear(); - parsedColumn.reserve(Buffer.NumberValues); - } + ClearColumns(Buffer.NumberValues); size_t rowId = 0; simdjson::ondemand::document_stream documents = Parser.iterate_many(values, size, simdjson::ondemand::DEFAULT_BATCH_SIZE); @@ -173,6 +170,7 @@ class TJsonParser::TImpl { try { parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value())); + Alloc.Ref().LockObject(parsedColumn.back()); } catch (...) { throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage(); } @@ -202,9 +200,20 @@ class TJsonParser::TImpl { ~TImpl() { Alloc.Acquire(); + ClearColumns(0); } private: + void ClearColumns(size_t reserveSize) { + for (auto& parsedColumn : ParsedValues) { + for (const auto& value : parsedColumn) { + Alloc.Ref().UnlockObject(value); + } + parsedColumn.clear(); + parsedColumn.reserve(reserveSize); + } + } + void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const { if (columnDesc.Type->IsOptional()) { parsedColumn.resize(size); @@ -213,7 +222,7 @@ class TJsonParser::TImpl { } } - NYql::NUdf::TUnboxedValue ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { + NYql::NUdf::TUnboxedValuePod ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { switch (type->GetKind()) { case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); @@ -236,7 +245,7 @@ class TJsonParser::TImpl { } } - NYql::NUdf::TUnboxedValue ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { + NYql::NUdf::TUnboxedValuePod ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); switch (jsonValue.type()) { case simdjson::fallback::ondemand::json_type::number: { @@ -307,7 +316,7 @@ class TJsonParser::TImpl { } template - static NYql::NUdf::TUnboxedValue ParseJsonNumber(TJsonNumber number) { + static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { throw yexception() << "number is out of range"; } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 83d20a37ecf1..fecc26e4a73c 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -36,6 +36,11 @@ class TFixture : public NUnitTest::TBaseFixture { void TearDown(NUnitTest::TTestContext& /* context */) override { with_lock (Alloc) { + for (const auto& holder : Holders) { + for (const auto& value : holder) { + Alloc.Ref().UnlockObject(value); + } + } Holders.clear(); } Filter.reset(); @@ -58,6 +63,7 @@ class TFixture : public NUnitTest::TBaseFixture { Holders.emplace_front(); for (size_t i = 0; i < size; ++i) { Holders.front().emplace_back(valueCreator(i)); + Alloc.Ref().LockObject(Holders.front().back()); } return &Holders.front(); } From b274c81a03b7a50f2ea8f946680054eebad8f5e3 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 23 Oct 2024 17:39:51 +0000 Subject: [PATCH 4/6] Fixed number rows validation --- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 4 ++-- ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index a0e79e7b02b0..6eb60e2f0f72 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -177,8 +177,8 @@ class TJsonParser::TImpl { } rowId++; } - if (rowId < Buffer.NumberValues) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows but got " << rowId; + if (rowId != Buffer.NumberValues) { + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId; } for (size_t i = 0; i < Columns.size(); ++i) { diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index 28227e0e8591..b3a1e8d5ab30 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -235,8 +235,9 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { } Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { - MakeParser({"a2", "a1"}); + MakeParser({"a"}); UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a": "value1"} {"a": "value2"})"), yexception, "Failed to parse json messages, expected 1 json rows from offset 42 but got 2"); } } From d46e3eec8cbc52e12f09b94f69411e65eb9651c4 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 24 Oct 2024 16:37:33 +0000 Subject: [PATCH 5/6] Fixed perf --- .../fq/libs/row_dispatcher/json_parser.cpp | 351 ++++++++++-------- .../libs/row_dispatcher/ut/json_parser_ut.cpp | 9 +- 2 files changed, 209 insertions(+), 151 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 6eb60e2f0f72..2092a12fee56 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -72,6 +72,183 @@ struct TJsonParserBuffer { TStringBuilder Values = {}; }; +class TColumnParser { + using TParser = std::function; + +public: + const std::string Name; + const TString TypeYson; + const NKikimr::NMiniKQL::TType* TypeMkql; + const bool IsOptional = false; + size_t NumberValues = 0; + +public: + TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) + : Name(name) + , TypeYson(typeYson) + , TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr)) + , IsOptional(TypeMkql->IsOptional()) + , NumberValues(0) + { + try { + Parser = CreateParser(TypeMkql); + } catch (...) { + throw yexception() << "Failed to create parser for column '" << Name << "' with type " << TypeYson << ", description: " << CurrentExceptionMessage(); + } + } + + void ParseJsonValue(simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + Parser(jsonValue, resultValue); + NumberValues++; + } + + void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const { + if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) { + throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; + } + } + +private: + TParser CreateParser(const NKikimr::NMiniKQL::TType* type, bool optional = false) const { + switch (type->GetKind()) { + case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { + const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); + if (const auto dataSlot = dataType->GetDataSlot()) { + return GetJsonValueParser(*dataSlot, optional); + } + throw yexception() << "unsupported data type with id " << dataType->GetSchemeType(); + } + + case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { + return AddOptional(CreateParser(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType(), true)); + } + + default: { + throw yexception() << "unsupported type kind " << type->GetKindAsStr(); + } + } + } + + static TParser AddOptional(TParser parser) { + return [parser](simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + parser(std::move(jsonValue), resultValue); + if (resultValue) { + resultValue = resultValue.MakeOptional(); + } + }; + } + + static TParser GetJsonValueParser(NYql::NUdf::EDataSlot dataSlot, bool optional) { + const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); + return [dataSlot, optional, &typeInfo](simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + switch (jsonValue.type()) { + case simdjson::fallback::ondemand::json_type::number: { + try { + switch (dataSlot) { + case NYql::NUdf::EDataSlot::Int8: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int16: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int32: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int64: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_int64().value()); + break; + + case NYql::NUdf::EDataSlot::Uint8: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint16: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint32: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint64: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_uint64().value()); + break; + + case NYql::NUdf::EDataSlot::Double: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_double().value()); + break; + case NYql::NUdf::EDataSlot::Float: + resultValue = NYql::NUdf::TUnboxedValuePod(static_cast(jsonValue.get_double().value())); + break; + + default: + throw yexception() << "number value is not expected for data type " << typeInfo.Name; + } + } catch (...) { + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "'), error: " << CurrentExceptionMessage(); + } + break; + } + + case simdjson::fallback::ondemand::json_type::string: { + const auto rawString = jsonValue.get_string().value(); + resultValue = NKikimr::NMiniKQL::ValueFromString(dataSlot, rawString); + if (Y_UNLIKELY(!resultValue)) { + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; + } + Y_ENSURE(resultValue.LockRef() == 1); + break; + } + + case simdjson::fallback::ondemand::json_type::array: + case simdjson::fallback::ondemand::json_type::object: { + const auto rawJson = jsonValue.raw_json().value(); + if (Y_UNLIKELY(dataSlot != NYql::NUdf::EDataSlot::Json)) { + throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " < + static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { + if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { + throw yexception() << "number is out of range"; + } + return NYql::NUdf::TUnboxedValuePod(static_cast(number)); + } + + static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { + if (rawString.size() <= maxSize) { + return TString(rawString); + } + return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; + } + +private: + TParser Parser; +}; + } // anonymous namespace namespace NFq { @@ -79,12 +256,6 @@ namespace NFq { //// TJsonParser class TJsonParser::TImpl { - struct TColumnDescription { - std::string Name; - TString TypeYson; - NKikimr::NMiniKQL::TType* Type; - }; - public: TImpl(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) @@ -94,7 +265,6 @@ class TJsonParser::TImpl { , ParsedValues(columns.size()) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); - LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); with_lock (Alloc) { auto functonRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {}); @@ -102,11 +272,7 @@ class TJsonParser::TImpl { Columns.reserve(columns.size()); for (size_t i = 0; i < columns.size(); i++) { - Columns.emplace_back(TColumnDescription{ - .Name = columns[i], - .TypeYson = types[i], - .Type = NYql::NCommon::ParseTypeFromYson(TStringBuf(types[i]), programBuilder, Cerr) - }); + Columns.emplace_back(columns[i], types[i], programBuilder); } } @@ -116,6 +282,8 @@ class TJsonParser::TImpl { } Buffer.Reserve(BatchSize, 1); + + LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); Parser.threaded = false; } @@ -164,25 +332,23 @@ class TJsonParser::TImpl { continue; } - const TColumnDescription& columnDesc = Columns[it->second]; - auto& parsedColumn = ParsedValues[it->second]; - ResizeColumn(columnDesc, parsedColumn, rowId); - + const size_t columnId = it->second; + auto& columnParser = Columns[columnId]; try { - parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value())); - Alloc.Ref().LockObject(parsedColumn.back()); + columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]); } catch (...) { - throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage(); + throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage(); } } rowId++; } + + const ui64 firstOffset = Buffer.Offsets.front(); if (rowId != Buffer.NumberValues) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId; + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId; } - - for (size_t i = 0; i < Columns.size(); ++i) { - ResizeColumn(Columns[i], ParsedValues[i], Buffer.NumberValues); + for (const auto& columnDesc : Columns) { + columnDesc.ValidateNumberValues(rowId, firstOffset); } } @@ -204,145 +370,34 @@ class TJsonParser::TImpl { } private: - void ClearColumns(size_t reserveSize) { - for (auto& parsedColumn : ParsedValues) { - for (const auto& value : parsedColumn) { - Alloc.Ref().UnlockObject(value); - } - parsedColumn.clear(); - parsedColumn.reserve(reserveSize); - } - } - - void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const { - if (columnDesc.Type->IsOptional()) { - parsedColumn.resize(size); - } else if (Y_UNLIKELY(parsedColumn.size() < size)) { - throw yexception() << "Failed to parse json string, found missing value at offset " << Buffer.Offsets[parsedColumn.size()] << " in non optional column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson; - } - } - - NYql::NUdf::TUnboxedValuePod ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { - switch (type->GetKind()) { - case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { - const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); - if (const auto dataSlot = dataType->GetDataSlot()) { - return ParseJsonValue(*dataSlot, jsonValue); - } - throw yexception() << "unsupported data type with id " << dataType->GetSchemeType(); - } - - case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { - if (jsonValue.is_null()) { - return NYql::NUdf::TUnboxedValuePod(); - } - return ParseJsonValue(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType(), jsonValue).MakeOptional(); - } - - default: { - throw yexception() << "unsupported type kind " << type->GetKindAsStr(); - } + void ClearColumns(size_t newSize) { + const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){ + value.UnlockRef(1); + value.Clear(); + }; + + for (size_t i = 0; i < Columns.size(); ++i) { + Columns[i].NumberValues = 0; + + auto& parsedColumn = ParsedValues[i]; + std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue); + parsedColumn.resize(newSize); } } - NYql::NUdf::TUnboxedValuePod ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { - const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); - switch (jsonValue.type()) { - case simdjson::fallback::ondemand::json_type::number: { - try { - switch (dataSlot) { - case NYql::NUdf::EDataSlot::Int8: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int16: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int32: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int64: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_int64().value()); - - case NYql::NUdf::EDataSlot::Uint8: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint16: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint32: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint64: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_uint64().value()); - - case NYql::NUdf::EDataSlot::Double: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_double().value()); - case NYql::NUdf::EDataSlot::Float: - return NYql::NUdf::TUnboxedValuePod(static_cast(jsonValue.get_double().value())); - - default: - throw yexception() << "number value is not expected for data type " << typeInfo.Name; - } - } catch (...) { - throw yexception() << "failed to parse data type " << typeInfo.Name << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "'), error: " << CurrentExceptionMessage(); - } - } - - case simdjson::fallback::ondemand::json_type::string: { - const auto rawString = jsonValue.get_string().value(); - if (NYql::NUdf::TUnboxedValuePod result = NKikimr::NMiniKQL::ValueFromString(dataSlot, rawString)) { - return result; - } - throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; - } - - case simdjson::fallback::ondemand::json_type::array: - case simdjson::fallback::ondemand::json_type::object: { - const auto rawJson = jsonValue.raw_json().value(); - if (dataSlot != NYql::NUdf::EDataSlot::Json) { - throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " < - static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { - if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { - throw yexception() << "number is out of range"; - } - return NYql::NUdf::TUnboxedValuePod(static_cast(number)); - } - - static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { - if (rawString.size() <= maxSize) { - return TString(rawString); - } - return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; - } - private: NKikimr::NMiniKQL::TScopedAlloc Alloc; NKikimr::NMiniKQL::TTypeEnvironment TypeEnv; const ui64 BatchSize; const TDuration BatchCreationTimeout; - TVector Columns; + TVector Columns; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector ParsedValues; + TVector>> ParsedValues; }; TJsonParser::TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index b3a1e8d5ab30..bdf74270094d 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -208,12 +208,15 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) { MakeParser({"a1", "a2"}, {"[DataType; String]", "[DataType; Uint64]"}); UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "hello1", "a2": null, "event": "event1"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint64], description: (yexception) found unexpected null value, expected non optional data type Uint64"); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": 105, "event": "event1"})"), yexception, "Failed to parse json string, found missing value at offset 42 in non optional column 'a1' with type [DataType; String]"); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": 105, "event": "event1"})"), yexception, "Failed to parse json messages, found 1 missing values from offset 42 in non optional column 'a1' with type [DataType; String]"); } Y_UNIT_TEST_F(TypeKindsValidation, TFixture) { - MakeParser({"a2", "a1"}, {"[OptionalType; [DataType; String]]", "[ListType; [DataType; String]]"}); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": "hello1", "a1": ["key", "value"]})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [ListType; [DataType; String]], description: (yexception) unsupported type kind List"); + UNIT_ASSERT_EXCEPTION_CONTAINS( + MakeParser({"a2", "a1"}, {"[OptionalType; [DataType; String]]", "[ListType; [DataType; String]]"}), + yexception, + "Failed to create parser for column 'a1' with type [ListType; [DataType; String]], description: (yexception) unsupported type kind List" + ); } Y_UNIT_TEST_F(NumbersValidation, TFixture) { From b96b6590a0ebd102ff7fc259a5eb896f4cc0c7f9 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Thu, 24 Oct 2024 16:55:34 +0000 Subject: [PATCH 6/6] Fixed LockObject --- ydb/core/fq/libs/row_dispatcher/json_parser.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 2092a12fee56..56946ef49a38 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -193,7 +193,7 @@ class TColumnParser { if (Y_UNLIKELY(!resultValue)) { throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; } - Y_ENSURE(resultValue.LockRef() == 1); + LockObject(resultValue); break; } @@ -207,7 +207,7 @@ class TColumnParser { throw yexception() << "found bad json value: '" << TruncateString(rawJson) << "'"; } resultValue = NKikimr::NMiniKQL::MakeString(rawJson); - Y_ENSURE(resultValue.LockRef() == 1); + LockObject(resultValue); break; } @@ -238,6 +238,11 @@ class TColumnParser { return NYql::NUdf::TUnboxedValuePod(static_cast(number)); } + static void LockObject(NYql::NUdf::TUnboxedValue& value) { + const i32 numberRefs = value.LockRef(); + Y_ENSURE(numberRefs == -1 || numberRefs == 1); + } + static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { if (rawString.size() <= maxSize) { return TString(rawString);