diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 6eb60e2f0f72..2092a12fee56 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -72,6 +72,183 @@ struct TJsonParserBuffer { TStringBuilder Values = {}; }; +class TColumnParser { + using TParser = std::function; + +public: + const std::string Name; + const TString TypeYson; + const NKikimr::NMiniKQL::TType* TypeMkql; + const bool IsOptional = false; + size_t NumberValues = 0; + +public: + TColumnParser(const TString& name, const TString& typeYson, NKikimr::NMiniKQL::TProgramBuilder& programBuilder) + : Name(name) + , TypeYson(typeYson) + , TypeMkql(NYql::NCommon::ParseTypeFromYson(TStringBuf(typeYson), programBuilder, Cerr)) + , IsOptional(TypeMkql->IsOptional()) + , NumberValues(0) + { + try { + Parser = CreateParser(TypeMkql); + } catch (...) { + throw yexception() << "Failed to create parser for column '" << Name << "' with type " << TypeYson << ", description: " << CurrentExceptionMessage(); + } + } + + void ParseJsonValue(simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + Parser(jsonValue, resultValue); + NumberValues++; + } + + void ValidateNumberValues(size_t expectedNumberValues, ui64 firstOffset) const { + if (Y_UNLIKELY(!IsOptional && NumberValues < expectedNumberValues)) { + throw yexception() << "Failed to parse json messages, found " << expectedNumberValues - NumberValues << " missing values from offset " << firstOffset << " in non optional column '" << Name << "' with type " << TypeYson; + } + } + +private: + TParser CreateParser(const NKikimr::NMiniKQL::TType* type, bool optional = false) const { + switch (type->GetKind()) { + case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { + const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); + if (const auto dataSlot = dataType->GetDataSlot()) { + return GetJsonValueParser(*dataSlot, optional); + } + throw yexception() << "unsupported data type with id " << dataType->GetSchemeType(); + } + + case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { + return AddOptional(CreateParser(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType(), true)); + } + + default: { + throw yexception() << "unsupported type kind " << type->GetKindAsStr(); + } + } + } + + static TParser AddOptional(TParser parser) { + return [parser](simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + parser(std::move(jsonValue), resultValue); + if (resultValue) { + resultValue = resultValue.MakeOptional(); + } + }; + } + + static TParser GetJsonValueParser(NYql::NUdf::EDataSlot dataSlot, bool optional) { + const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); + return [dataSlot, optional, &typeInfo](simdjson::fallback::ondemand::value jsonValue, NYql::NUdf::TUnboxedValue& resultValue) { + switch (jsonValue.type()) { + case simdjson::fallback::ondemand::json_type::number: { + try { + switch (dataSlot) { + case NYql::NUdf::EDataSlot::Int8: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int16: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int32: + resultValue = ParseJsonNumber(jsonValue.get_int64().value()); + break; + case NYql::NUdf::EDataSlot::Int64: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_int64().value()); + break; + + case NYql::NUdf::EDataSlot::Uint8: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint16: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint32: + resultValue = ParseJsonNumber(jsonValue.get_uint64().value()); + break; + case NYql::NUdf::EDataSlot::Uint64: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_uint64().value()); + break; + + case NYql::NUdf::EDataSlot::Double: + resultValue = NYql::NUdf::TUnboxedValuePod(jsonValue.get_double().value()); + break; + case NYql::NUdf::EDataSlot::Float: + resultValue = NYql::NUdf::TUnboxedValuePod(static_cast(jsonValue.get_double().value())); + break; + + default: + throw yexception() << "number value is not expected for data type " << typeInfo.Name; + } + } catch (...) { + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "'), error: " << CurrentExceptionMessage(); + } + break; + } + + case simdjson::fallback::ondemand::json_type::string: { + const auto rawString = jsonValue.get_string().value(); + resultValue = NKikimr::NMiniKQL::ValueFromString(dataSlot, rawString); + if (Y_UNLIKELY(!resultValue)) { + throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; + } + Y_ENSURE(resultValue.LockRef() == 1); + break; + } + + case simdjson::fallback::ondemand::json_type::array: + case simdjson::fallback::ondemand::json_type::object: { + const auto rawJson = jsonValue.raw_json().value(); + if (Y_UNLIKELY(dataSlot != NYql::NUdf::EDataSlot::Json)) { + throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " < + static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { + if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { + throw yexception() << "number is out of range"; + } + return NYql::NUdf::TUnboxedValuePod(static_cast(number)); + } + + static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { + if (rawString.size() <= maxSize) { + return TString(rawString); + } + return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; + } + +private: + TParser Parser; +}; + } // anonymous namespace namespace NFq { @@ -79,12 +256,6 @@ namespace NFq { //// TJsonParser class TJsonParser::TImpl { - struct TColumnDescription { - std::string Name; - TString TypeYson; - NKikimr::NMiniKQL::TType* Type; - }; - public: TImpl(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) : Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true, false) @@ -94,7 +265,6 @@ class TJsonParser::TImpl { , ParsedValues(columns.size()) { Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal"); - LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); with_lock (Alloc) { auto functonRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {}); @@ -102,11 +272,7 @@ class TJsonParser::TImpl { Columns.reserve(columns.size()); for (size_t i = 0; i < columns.size(); i++) { - Columns.emplace_back(TColumnDescription{ - .Name = columns[i], - .TypeYson = types[i], - .Type = NYql::NCommon::ParseTypeFromYson(TStringBuf(types[i]), programBuilder, Cerr) - }); + Columns.emplace_back(columns[i], types[i], programBuilder); } } @@ -116,6 +282,8 @@ class TJsonParser::TImpl { } Buffer.Reserve(BatchSize, 1); + + LOG_ROW_DISPATCHER_INFO("Simdjson active implementation " << simdjson::get_active_implementation()->name()); Parser.threaded = false; } @@ -164,25 +332,23 @@ class TJsonParser::TImpl { continue; } - const TColumnDescription& columnDesc = Columns[it->second]; - auto& parsedColumn = ParsedValues[it->second]; - ResizeColumn(columnDesc, parsedColumn, rowId); - + const size_t columnId = it->second; + auto& columnParser = Columns[columnId]; try { - parsedColumn.emplace_back(ParseJsonValue(columnDesc.Type, item.value())); - Alloc.Ref().LockObject(parsedColumn.back()); + columnParser.ParseJsonValue(item.value(), ParsedValues[columnId][rowId]); } catch (...) { - throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson << ", description: " << CurrentExceptionMessage(); + throw yexception() << "Failed to parse json string at offset " << Buffer.Offsets[rowId] << ", got parsing error for column '" << columnParser.Name << "' with type " << columnParser.TypeYson << ", description: " << CurrentExceptionMessage(); } } rowId++; } + + const ui64 firstOffset = Buffer.Offsets.front(); if (rowId != Buffer.NumberValues) { - throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << Buffer.Offsets.front() << " but got " << rowId; + throw yexception() << "Failed to parse json messages, expected " << Buffer.NumberValues << " json rows from offset " << firstOffset << " but got " << rowId; } - - for (size_t i = 0; i < Columns.size(); ++i) { - ResizeColumn(Columns[i], ParsedValues[i], Buffer.NumberValues); + for (const auto& columnDesc : Columns) { + columnDesc.ValidateNumberValues(rowId, firstOffset); } } @@ -204,145 +370,34 @@ class TJsonParser::TImpl { } private: - void ClearColumns(size_t reserveSize) { - for (auto& parsedColumn : ParsedValues) { - for (const auto& value : parsedColumn) { - Alloc.Ref().UnlockObject(value); - } - parsedColumn.clear(); - parsedColumn.reserve(reserveSize); - } - } - - void ResizeColumn(const TColumnDescription& columnDesc, NKikimr::NMiniKQL::TUnboxedValueVector& parsedColumn, size_t size) const { - if (columnDesc.Type->IsOptional()) { - parsedColumn.resize(size); - } else if (Y_UNLIKELY(parsedColumn.size() < size)) { - throw yexception() << "Failed to parse json string, found missing value at offset " << Buffer.Offsets[parsedColumn.size()] << " in non optional column '" << columnDesc.Name << "' with type " << columnDesc.TypeYson; - } - } - - NYql::NUdf::TUnboxedValuePod ParseJsonValue(const NKikimr::NMiniKQL::TType* type, simdjson::fallback::ondemand::value jsonValue) const { - switch (type->GetKind()) { - case NKikimr::NMiniKQL::TTypeBase::EKind::Data: { - const auto* dataType = AS_TYPE(NKikimr::NMiniKQL::TDataType, type); - if (const auto dataSlot = dataType->GetDataSlot()) { - return ParseJsonValue(*dataSlot, jsonValue); - } - throw yexception() << "unsupported data type with id " << dataType->GetSchemeType(); - } - - case NKikimr::NMiniKQL::TTypeBase::EKind::Optional: { - if (jsonValue.is_null()) { - return NYql::NUdf::TUnboxedValuePod(); - } - return ParseJsonValue(AS_TYPE(NKikimr::NMiniKQL::TOptionalType, type)->GetItemType(), jsonValue).MakeOptional(); - } - - default: { - throw yexception() << "unsupported type kind " << type->GetKindAsStr(); - } + void ClearColumns(size_t newSize) { + const auto clearValue = [&allocState = Alloc.Ref()](NYql::NUdf::TUnboxedValue& value){ + value.UnlockRef(1); + value.Clear(); + }; + + for (size_t i = 0; i < Columns.size(); ++i) { + Columns[i].NumberValues = 0; + + auto& parsedColumn = ParsedValues[i]; + std::for_each(parsedColumn.begin(), parsedColumn.end(), clearValue); + parsedColumn.resize(newSize); } } - NYql::NUdf::TUnboxedValuePod ParseJsonValue(NYql::NUdf::EDataSlot dataSlot, simdjson::fallback::ondemand::value jsonValue) const { - const auto& typeInfo = NYql::NUdf::GetDataTypeInfo(dataSlot); - switch (jsonValue.type()) { - case simdjson::fallback::ondemand::json_type::number: { - try { - switch (dataSlot) { - case NYql::NUdf::EDataSlot::Int8: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int16: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int32: - return ParseJsonNumber(jsonValue.get_int64().value()); - case NYql::NUdf::EDataSlot::Int64: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_int64().value()); - - case NYql::NUdf::EDataSlot::Uint8: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint16: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint32: - return ParseJsonNumber(jsonValue.get_uint64().value()); - case NYql::NUdf::EDataSlot::Uint64: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_uint64().value()); - - case NYql::NUdf::EDataSlot::Double: - return NYql::NUdf::TUnboxedValuePod(jsonValue.get_double().value()); - case NYql::NUdf::EDataSlot::Float: - return NYql::NUdf::TUnboxedValuePod(static_cast(jsonValue.get_double().value())); - - default: - throw yexception() << "number value is not expected for data type " << typeInfo.Name; - } - } catch (...) { - throw yexception() << "failed to parse data type " << typeInfo.Name << " from json number (raw: '" << TruncateString(jsonValue.raw_json_token()) << "'), error: " << CurrentExceptionMessage(); - } - } - - case simdjson::fallback::ondemand::json_type::string: { - const auto rawString = jsonValue.get_string().value(); - if (NYql::NUdf::TUnboxedValuePod result = NKikimr::NMiniKQL::ValueFromString(dataSlot, rawString)) { - return result; - } - throw yexception() << "failed to parse data type " << typeInfo.Name << " from json string: '" << TruncateString(rawString) << "'"; - } - - case simdjson::fallback::ondemand::json_type::array: - case simdjson::fallback::ondemand::json_type::object: { - const auto rawJson = jsonValue.raw_json().value(); - if (dataSlot != NYql::NUdf::EDataSlot::Json) { - throw yexception() << "found unexpected nested value (raw: '" << TruncateString(rawJson) << "'), expected data type " < - static NYql::NUdf::TUnboxedValuePod ParseJsonNumber(TJsonNumber number) { - if (number < std::numeric_limits::min() || std::numeric_limits::max() < number) { - throw yexception() << "number is out of range"; - } - return NYql::NUdf::TUnboxedValuePod(static_cast(number)); - } - - static TString TruncateString(std::string_view rawString, size_t maxSize = 1_KB) { - if (rawString.size() <= maxSize) { - return TString(rawString); - } - return TStringBuilder() << rawString.substr(0, maxSize) << " truncated..."; - } - private: NKikimr::NMiniKQL::TScopedAlloc Alloc; NKikimr::NMiniKQL::TTypeEnvironment TypeEnv; const ui64 BatchSize; const TDuration BatchCreationTimeout; - TVector Columns; + TVector Columns; absl::flat_hash_map ColumnsIndex; TJsonParserBuffer Buffer; simdjson::ondemand::parser Parser; - TVector ParsedValues; + TVector>> ParsedValues; }; TJsonParser::TJsonParser(const TVector& columns, const TVector& types, ui64 batchSize, TDuration batchCreationTimeout) diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index b3a1e8d5ab30..bdf74270094d 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -208,12 +208,15 @@ Y_UNIT_TEST_SUITE(TJsonParserTests) { Y_UNIT_TEST_F(MissingFieldsValidation, TFixture) { MakeParser({"a1", "a2"}, {"[DataType; String]", "[DataType; Uint64]"}); UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a1": "hello1", "a2": null, "event": "event1"})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a2' with type [DataType; Uint64], description: (yexception) found unexpected null value, expected non optional data type Uint64"); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": 105, "event": "event1"})"), yexception, "Failed to parse json string, found missing value at offset 42 in non optional column 'a1' with type [DataType; String]"); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": 105, "event": "event1"})"), yexception, "Failed to parse json messages, found 1 missing values from offset 42 in non optional column 'a1' with type [DataType; String]"); } Y_UNIT_TEST_F(TypeKindsValidation, TFixture) { - MakeParser({"a2", "a1"}, {"[OptionalType; [DataType; String]]", "[ListType; [DataType; String]]"}); - UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(42, R"({"a2": "hello1", "a1": ["key", "value"]})"), yexception, "Failed to parse json string at offset 42, got parsing error for column 'a1' with type [ListType; [DataType; String]], description: (yexception) unsupported type kind List"); + UNIT_ASSERT_EXCEPTION_CONTAINS( + MakeParser({"a2", "a1"}, {"[OptionalType; [DataType; String]]", "[ListType; [DataType; String]]"}), + yexception, + "Failed to create parser for column 'a1' with type [ListType; [DataType; String]], description: (yexception) unsupported type kind List" + ); } Y_UNIT_TEST_F(NumbersValidation, TFixture) {