Skip to content

Commit

Permalink
Merge 0c1a8ca into aa99798
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Oct 6, 2024
2 parents aa99798 + 0c1a8ca commit 84439a4
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 37 deletions.
103 changes: 92 additions & 11 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1665,7 +1665,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
Expand All @@ -1674,29 +1674,110 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = resultQueueUrl;
auto body = "MessageBody-0";
sendMessageReq["MessageBody"] = body;
sendMessageReq["MessageBody"] = body;
auto body0 = "MessageBody-0";
sendMessageReq["MessageBody"] = body0;

res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", sendMessageReq, FormAuthorizationStr("ru-central1"));
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
for (int i = 0; i < 20; ++i) {
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", std::move(receiveMessageReq), FormAuthorizationStr("ru-central1"));
if (res.Body != TString("{}")) {
break;;
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", receiveMessageReq, FormAuthorizationStr("ru-central1"));
if (res.Body != "{}") {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], body);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], body0);
}

Y_UNIT_TEST_F(TestReceiveMessageWithAttributes, THttpProxyTestMock) {
// Test if we process AttributeNames, MessageSystemAttributeNames, MessageAttributeNames correctly.

auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

auto sendMessage = [this, resultQueueUrl](const TString& body) {
NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = resultQueueUrl;
sendMessageReq["MessageBody"] = body;

auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", sendMessageReq, FormAuthorizationStr("ru-central1"));
NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
};

TString body = "MessageBody-0";
sendMessage(body);

auto receiveMessage = [this](NJson::TJsonValue request, const TString& expectedBody) -> NJson::TJsonValue {
request["VisibilityTimeout"] = 0; // Keep the message visible for next ReceiveMessage requests.
THttpResult res;
for (int i = 0; i < 20; ++i) {
res = SendHttpRequest("/Root", "AmazonSQS.ReceiveMessage", request, FormAuthorizationStr("ru-central1"));
if (res.Body != "{}") {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(json["Messages"][0]["Body"], expectedBody);
return json;
};

{
// Request SentTimestamp message system attribute using deprecated AttributeNames field.
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
receiveMessageReq["AttributeNames"] = NJson::TJsonArray{"SentTimestamp"};
json = receiveMessage(receiveMessageReq, body);
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
}

{
// Request SentTimestamp message system attribute using MessageSystemAttributeNames field.
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
receiveMessageReq["MessageSystemAttributeNames"] = NJson::TJsonArray{"SentTimestamp"};
json = receiveMessage(receiveMessageReq, body);
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
}

{
// Request All message system attributes using deprecated AttributeNames field.
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
receiveMessageReq["AttributeNames"] = NJson::TJsonArray{"All"};
json = receiveMessage(receiveMessageReq, body);
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
}

{
// Request All message system attributes using MessageSystemAttributeNames field.
NJson::TJsonValue receiveMessageReq;
receiveMessageReq["QueueUrl"] = resultQueueUrl;
receiveMessageReq["MessageSystemAttributeNames"] = NJson::TJsonArray{"All"};
json = receiveMessage(receiveMessageReq, body);
UNIT_ASSERT(!json["Messages"][0]["Attributes"]["SentTimestamp"].GetString().empty());
}
}

Y_UNIT_TEST_F(TestGetQueueAttributes, THttpProxyTestMock) {
Expand Down
49 changes: 23 additions & 26 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,29 +321,26 @@ namespace NKikimr::NYmq::V1 {
for (const auto& srcMessage: GetResponse(resp).GetMessages()) {
Ydb::Ymq::V1::Message dstMessage;

for (TString& attributeName : AttributesNames) {
if (attributeName == APPROXIMATE_RECEIVE_COUNT) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetApproximateReceiveCount());
} else if (attributeName == APPROXIMATE_FIRST_RECEIVE_TIMESTAMP) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetApproximateFirstReceiveTimestamp());
} else if (attributeName == MESSAGE_DEDUPLICATION_ID) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetMessageDeduplicationId());
} else if (attributeName == MESSAGE_GROUP_ID) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetMessageGroupId());
} else if (attributeName == SENDER_ID) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetSenderId());
} else if (attributeName == SENT_TIMESTAMP) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetSentTimestamp());
} else if (attributeName == SEQUENCE_NUMBER) {
dstMessage.Mutableattributes()->at(attributeName)
.assign(srcMessage.GetSequenceNumber());
}
if (srcMessage.HasApproximateReceiveCount()) {
dstMessage.Mutableattributes()->insert({APPROXIMATE_RECEIVE_COUNT, std::to_string(srcMessage.GetApproximateReceiveCount())});
}
if (srcMessage.HasApproximateFirstReceiveTimestamp()) {
dstMessage.Mutableattributes()->insert({APPROXIMATE_FIRST_RECEIVE_TIMESTAMP, std::to_string(srcMessage.GetApproximateFirstReceiveTimestamp())});
}
if (srcMessage.HasMessageDeduplicationId()) {
dstMessage.Mutableattributes()->insert({MESSAGE_DEDUPLICATION_ID, srcMessage.GetMessageDeduplicationId()});
}
if (srcMessage.HasMessageGroupId()) {
dstMessage.Mutableattributes()->insert({MESSAGE_GROUP_ID, srcMessage.GetMessageGroupId()});
}
if (srcMessage.HasSenderId()) {
dstMessage.Mutableattributes()->insert({SENDER_ID, srcMessage.GetSenderId()});
}
if (srcMessage.HasSentTimestamp()) {
dstMessage.Mutableattributes()->insert({SENT_TIMESTAMP, std::to_string(srcMessage.GetSentTimestamp())});
}
if (srcMessage.HasSequenceNumber()) {
dstMessage.Mutableattributes()->insert({SEQUENCE_NUMBER, std::to_string(srcMessage.GetSequenceNumber())});
}

dstMessage.set_body(srcMessage.GetData());
Expand Down Expand Up @@ -391,17 +388,17 @@ namespace NKikimr::NYmq::V1 {
// because AttributeNames is deprecated in favour of SystemAttributeNames
if (systemAttributeNames.size() > 0) {
for (int i = 0; i < systemAttributeNames.size(); i++) {
result->SetAttributeName(i, systemAttributeNames[i]);
result->AddAttributeName(systemAttributeNames[i]);
}
} else {
auto attributeNames = GetProtoRequest()->Getattribute_names();
for (int i = 0; i < attributeNames.size(); i++) {
result->SetAttributeName(i, attributeNames[i]);
result->AddAttributeName(attributeNames[i]);
}
}

for (int i = 0; i < GetProtoRequest()->Getmessage_attribute_names().size(); i++) {
result->SetMessageAttributeName(i, GetProtoRequest()->Getmessage_attribute_names()[i]);
result->AddMessageAttributeName(GetProtoRequest()->Getmessage_attribute_names()[i]);
}

return result;
Expand Down

0 comments on commit 84439a4

Please sign in to comment.