From 8a9234d8323177703d5929fe040f89f8405914c0 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Thu, 29 Aug 2024 08:13:20 +0000 Subject: [PATCH 1/2] LOGBROKER-8891: fix inability to parse json into protobuf map in http_proxy and fix returning MD5 of message attributes in SendMessageBatch --- ydb/core/http_proxy/json_proto_conversion.h | 51 ++++++++++++++------- ydb/core/http_proxy/ut/http_proxy_ut.h | 10 ++++ ydb/services/ymq/ymq_proxy.cpp | 1 + 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h index 9862aa4d1761..ee8ed381f099 100644 --- a/ydb/core/http_proxy/json_proto_conversion.h +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -147,59 +147,78 @@ inline void AddJsonObjectToProtoAsMap( const google::protobuf::Reflection* reflection, grpc::protobuf::Message* message, const JSON& jsonObject, + ui32 depth, std::function extractMap, - std::function valueToString + std::function valueToString, + std::function jsonObjectToMessage ) { const auto& protoMap = reflection->GetMutableRepeatedFieldRef(message, fieldDescriptor); for (const auto& [key, value] : extractMap(jsonObject)) { - std::unique_ptr stringStringEntry( + std::unique_ptr mapEntry( google::protobuf::MessageFactory::generated_factory() ->GetPrototype(fieldDescriptor->message_type()) ->New(message->GetArena()) ); - stringStringEntry + mapEntry ->GetReflection() - ->SetString(stringStringEntry.get(), fieldDescriptor->message_type()->field(0), key); - stringStringEntry - ->GetReflection() - ->SetString(stringStringEntry.get(), fieldDescriptor->message_type()->field(1), valueToString(value)); - protoMap.Add(*stringStringEntry); + ->SetString(mapEntry.get(), fieldDescriptor->message_type()->field(0), key); + + auto valueField = fieldDescriptor->message_type()->field(1); + if (valueField->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE) { + auto *msg = mapEntry->GetReflection()->MutableMessage(mapEntry.get(), valueField); + jsonObjectToMessage(value, msg, depth); + } else if (valueField->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING) { + mapEntry->GetReflection()->SetString(mapEntry.get(), valueField, valueToString(value)); + } else { + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::INVALID_PARAMETER_VALUE) + << "Only String and Object can be converted to protobuf map"; + } + protoMap.Add(std::move(*mapEntry)); } } +void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0); + inline void AddJsonObjectToProtoAsMap( const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection, grpc::protobuf::Message* message, - const NJson::TJsonValue& jsonObject + const NJson::TJsonValue& jsonObject, + ui32 depth ) { AddJsonObjectToProtoAsMap( fieldDescriptor, reflection, message, jsonObject, + depth, [](auto& json) { return json.GetMap(); }, - [](auto& value) -> const TString { return value.GetString(); } + [](auto& value) -> const TString { return value.GetString(); }, + [](auto& json, auto message, auto depth) { JsonToProto(json, message, depth); } ); } +void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0); inline void AddJsonObjectToProtoAsMap( const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection, grpc::protobuf::Message* message, - const nlohmann::basic_json<>& jsonObject + const nlohmann::basic_json<>& jsonObject, + ui32 depth ) { AddJsonObjectToProtoAsMap, std::map>>( fieldDescriptor, reflection, message, jsonObject, + depth, [](auto& json) { return json.template get>>(); }, - [](auto& value) -> const TString { return value.template get(); } + [](auto& value) -> const TString { return value.template get(); }, + [](auto& json, auto message, auto depth) { NlohmannJsonToProto(json, message, depth); } ); } -inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { +inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth) { Y_ENSURE(depth < 101, "Json depth is > 100"); Y_ENSURE_EX( !jsonValue.IsNull(), @@ -348,7 +367,7 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* break; case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { if (fieldDescriptor->is_map()) { - AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value); + AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth); } else { auto *msg = reflection->MutableMessage(message, fieldDescriptor); JsonToProto(value, msg, depth + 1); @@ -366,7 +385,7 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* } } -inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { +inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth) { Y_ENSURE(depth < 101, "Json depth is > 100"); Y_ENSURE_EX( !jsonValue.is_null(), @@ -518,7 +537,7 @@ inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Mess break; case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { if (fieldDescriptor->is_map()) { - AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value); + AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth); } else { auto *msg = reflection->MutableMessage(message, fieldDescriptor); NlohmannJsonToProto(value, msg, depth + 1); diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 8b43a0f98969..a825ea96c9cd 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1886,6 +1886,15 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { message0["MessageBody"] = "MessageBody-0"; message0["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + NJson::TJsonValue delaySeconds; + delaySeconds["StringValue"] = "1"; + delaySeconds["DataType"] = "String"; + + NJson::TJsonValue attributes; + attributes["DelaySeconds"] = delaySeconds; + + message0["MessageAttributes"] = attributes; + NJson::TJsonValue message1; message1["Id"] = "Id-1"; message1["MessageBody"] = "MessageBody-1"; @@ -1903,6 +1912,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(json["Successful"].GetArray().size() == 2); auto succesful0 = json["Successful"][0]; UNIT_ASSERT(succesful0["Id"] == "Id-0"); + UNIT_ASSERT(!GetByPath(succesful0, "Md5OfMessageAttributes").empty()); UNIT_ASSERT(!GetByPath(succesful0, "Md5OfMessageBody").empty()); UNIT_ASSERT(!GetByPath(succesful0, "MessageId").empty()); } diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 0453f6f5dfb0..cdb3684c3cd3 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -764,6 +764,7 @@ namespace NKikimr::NYmq::V1 { } else { auto currentSuccessful = result.Addsuccessful(); currentSuccessful->Setid(entry.GetId()); + currentSuccessful->Setmd5_of_message_attributes(entry.GetMD5OfMessageAttributes()); currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody()); currentSuccessful->Setmessage_id(entry.GetMessageId()); currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber())); From d0885ab121eff087fce70acde75261779a626660 Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Fri, 30 Aug 2024 12:18:35 +0000 Subject: [PATCH 2/2] fix map conversion depth --- ydb/core/http_proxy/json_proto_conversion.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h index ee8ed381f099..15a9142cc3ee 100644 --- a/ydb/core/http_proxy/json_proto_conversion.h +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -367,7 +367,7 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* break; case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { if (fieldDescriptor->is_map()) { - AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth); + AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth + 1); } else { auto *msg = reflection->MutableMessage(message, fieldDescriptor); JsonToProto(value, msg, depth + 1);