Skip to content

Commit

Permalink
LOGBROKER-8891: fix inability to parse json into protobuf map in http…
Browse files Browse the repository at this point in the history
…_proxy
  • Loading branch information
siarheivesialou committed Aug 30, 2024
1 parent 9e4a92a commit 6477839
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
44 changes: 32 additions & 12 deletions ydb/core/http_proxy/json_proto_conversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ inline void AddJsonObjectToProtoAsMap(
const google::protobuf::Reflection* reflection,
grpc::protobuf::Message* message,
const JSON& jsonObject,
ui32 depth,
std::function<const MAP(const JSON&)> extractMap,
std::function<const TString(const JSON&)> valueToString
std::function<const TString(const JSON&)> valueToString,
std::function<void(const JSON&, grpc::protobuf::Message*, ui32)> jsonObjectToMessage
) {
const auto& protoMap = reflection->GetMutableRepeatedFieldRef<google::protobuf::Message>(message, fieldDescriptor);
for (const auto& [key, value] : extractMap(jsonObject)) {
Expand All @@ -160,46 +162,64 @@ inline void AddJsonObjectToProtoAsMap(
stringStringEntry
->GetReflection()
->SetString(stringStringEntry.get(), fieldDescriptor->message_type()->field(0), key);
stringStringEntry
->GetReflection()
->SetString(stringStringEntry.get(), fieldDescriptor->message_type()->field(1), valueToString(value));

if (fieldDescriptor->message_type()->field(1)->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE) {
auto *msg = stringStringEntry->GetReflection()->MutableMessage(stringStringEntry.get(), fieldDescriptor->message_type()->field(1));
jsonObjectToMessage(value, msg, depth);
} else if (fieldDescriptor->message_type()->field(1)->cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING) {
stringStringEntry
->GetReflection()
->SetString(stringStringEntry.get(), fieldDescriptor->message_type()->field(1), valueToString(value));
} else {
throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::INVALID_PARAMETER_VALUE)
<< "Only String and Object can be converted to protobuf map";
}
protoMap.Add(*stringStringEntry);
}
}

void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0);

inline void AddJsonObjectToProtoAsMap(
const google::protobuf::FieldDescriptor* fieldDescriptor,
const google::protobuf::Reflection* reflection,
grpc::protobuf::Message* message,
const NJson::TJsonValue& jsonObject
const NJson::TJsonValue& jsonObject,
ui32 depth
) {
AddJsonObjectToProtoAsMap<NJson::TJsonValue, NJson::TJsonValue::TMapType>(
fieldDescriptor,
reflection,
message,
jsonObject,
depth,
[](auto& json) { return json.GetMap(); },
[](auto& value) -> const TString { return value.GetString(); }
[](auto& value) -> const TString { return value.GetString(); },
[](auto& json, auto message, auto depth) { JsonToProto(json, message, depth); }
);
}
void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0);

inline void AddJsonObjectToProtoAsMap(
const google::protobuf::FieldDescriptor* fieldDescriptor,
const google::protobuf::Reflection* reflection,
grpc::protobuf::Message* message,
const nlohmann::basic_json<>& jsonObject
const nlohmann::basic_json<>& jsonObject,
ui32 depth
) {
AddJsonObjectToProtoAsMap<nlohmann::basic_json<>, std::map<TString, nlohmann::basic_json<>>>(
fieldDescriptor,
reflection,
message,
jsonObject,
depth,
[](auto& json) { return json.template get<std::map<TString, nlohmann::basic_json<>>>(); },
[](auto& value) -> const TString { return value.template get<TString>(); }
[](auto& value) -> const TString { return value.template get<TString>(); },
[](auto& json, auto message, auto depth) { NlohmannJsonToProto(json, message, depth); }
);
}

inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth) {
Y_ENSURE(depth < 101, "Json depth is > 100");
Y_ENSURE_EX(
!jsonValue.IsNull(),
Expand Down Expand Up @@ -348,7 +368,7 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message*
break;
case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
if (fieldDescriptor->is_map()) {
AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value);
AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth);
} else {
auto *msg = reflection->MutableMessage(message, fieldDescriptor);
JsonToProto(value, msg, depth + 1);
Expand All @@ -366,7 +386,7 @@ inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message*
}
}

inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth) {
Y_ENSURE(depth < 101, "Json depth is > 100");
Y_ENSURE_EX(
!jsonValue.is_null(),
Expand Down Expand Up @@ -518,7 +538,7 @@ inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Mess
break;
case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
if (fieldDescriptor->is_map()) {
AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value);
AddJsonObjectToProtoAsMap(fieldDescriptor, reflection, message, value, depth);
} else {
auto *msg = reflection->MutableMessage(message, fieldDescriptor);
NlohmannJsonToProto(value, msg, depth + 1);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1886,6 +1886,15 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
message0["MessageBody"] = "MessageBody-0";
message0["MessageDeduplicationId"] = "MessageDeduplicationId-0";

NJson::TJsonValue delaySeconds;
delaySeconds["StringValue"] = "1";
delaySeconds["DataType"] = "String";

NJson::TJsonValue attributes;
attributes["DelaySeconds"] = delaySeconds;

message0["MessageAttributes"] = attributes;

NJson::TJsonValue message1;
message1["Id"] = "Id-1";
message1["MessageBody"] = "MessageBody-1";
Expand All @@ -1903,6 +1912,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
auto succesful0 = json["Successful"][0];
UNIT_ASSERT(succesful0["Id"] == "Id-0");
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageAttributes").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,7 @@ namespace NKikimr::NYmq::V1 {
} else {
auto currentSuccessful = result.Addsuccessful();
currentSuccessful->Setid(entry.GetId());
currentSuccessful->Setmd5_of_message_attributes(entry.GetMD5OfMessageAttributes());
currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody());
currentSuccessful->Setmessage_id(entry.GetMessageId());
currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber()));
Expand Down

0 comments on commit 6477839

Please sign in to comment.