Skip to content

Commit

Permalink
YMQ fixes for 24-3 (ydb-platform#9646)
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik committed Oct 1, 2024
1 parent 9809a3c commit 685988d
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 98 deletions.
9 changes: 7 additions & 2 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ namespace NKikimr::NHttpProxy {
.Counters = nullptr,
.AWSSignature = std::move(HttpContext.GetSignature()),
.IAMToken = HttpContext.IamToken,
.FolderID = ""
.FolderID = HttpContext.FolderId
};

auto authRequestProxy = MakeHolder<NSQS::THttpProxyAuthRequestProxy>(
Expand Down Expand Up @@ -1148,10 +1148,15 @@ namespace NKikimr::NHttpProxy {
SourceAddress = address;
}

DatabasePath = Request->URL;
DatabasePath = Request->URL.Before('?');
if (DatabasePath == "/") {
DatabasePath = "";
}
auto params = TCgiParameters(Request->URL.After('?'));
if (auto it = params.Find("folderId"); it != params.end()) {
FolderId = it->second;
}

//TODO: find out databaseId
ParseHeaders(Request->Headers);
}
Expand Down
82 changes: 69 additions & 13 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1625,6 +1625,20 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
}

Y_UNIT_TEST_F(TestGetQueueUrlWithIAM, THttpProxyTestMock) {
auto req = CreateSqsGetQueueUrlRequest();
req["QueueName"] = "not-existing-queue";
auto res = SendHttpRequest("/Root?folderId=XXX", "AmazonSQS.GetQueueUrl", std::move(req), "X-YaCloud-SubjectToken: Bearer proxy_sa@builtin");
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);

NJson::TJsonValue json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
TString resultType = GetByPath<TString>(json, "__type");
UNIT_ASSERT_VALUES_EQUAL(resultType, "AWS.SimpleQueueService.NonExistentQueue");
TString resultMessage = GetByPath<TString>(json, "message");
UNIT_ASSERT_VALUES_EQUAL(resultMessage, "The specified queue doesn't exist.");
}

Y_UNIT_TEST_F(TestSendMessage, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", std::move(createQueueReq), FormAuthorizationStr("ru-central1"));
Expand All @@ -1645,7 +1659,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "SequenceNumber").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
}

Expand All @@ -1666,7 +1680,7 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {

res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
UNIT_ASSERT(!GetByPath<TString>(json, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(json, "MD5OfMessageBody").empty());
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);

for (int i = 0; i < 20; ++i) {
Expand Down Expand Up @@ -1698,16 +1712,58 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
TString resultQueueUrl = GetByPath<TString>(json, "QueueUrl");
UNIT_ASSERT(resultQueueUrl.EndsWith("ExampleQueueName"));

NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"DelaySeconds"};
getQueueAttributes["AttributeNames"] = attributeNames;
{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"DelaySeconds"};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}

{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {
"ApproximateNumberOfMessages",
"ApproximateNumberOfMessagesDelayed",
"ApproximateNumberOfMessagesNotVisible",
"CreatedTimestamp",
"DelaySeconds",
"MaximumMessageSize",
"MessageRetentionPeriod",
"ReceiveMessageWaitTimeSeconds",
"RedrivePolicy",
"VisibilityTimeout",
"FifoQueue",
"ContentBasedDeduplication",
"QueueArn"
};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}

{
NJson::TJsonValue getQueueAttributes;
getQueueAttributes["QueueUrl"] = resultQueueUrl;
NJson::TJsonArray attributeNames = {"All"};
getQueueAttributes["AttributeNames"] = attributeNames;

res = SendHttpRequest("/Root", "AmazonSQS.GetQueueAttributes", std::move(getQueueAttributes), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 200);
NJson::TJsonValue resultJson;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &resultJson));
UNIT_ASSERT_VALUES_EQUAL(resultJson["Attributes"]["DelaySeconds"], "1");
}
}

Y_UNIT_TEST_F(TestListQueues, THttpProxyTestMock) {
Expand Down Expand Up @@ -1911,8 +1967,8 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(json["Successful"].GetArray().size() == 2);
auto succesful0 = json["Successful"][0];
UNIT_ASSERT(succesful0["Id"] == "Id-0");
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageAttributes").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "Md5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageAttributes").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MD5OfMessageBody").empty());
UNIT_ASSERT(!GetByPath<TString>(succesful0, "MessageId").empty());
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/public/api/protos/draft/ymq.proto
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ message GetQueueAttributesResult {
message GetQueueUrlRequest {
Ydb.Operations.OperationParams operation_params = 1;
string queue_name = 2;
optional string queue_owner_aws_account_id = 3;
optional string queue_owner_a_w_s_account_id = 3;
}

message GetQueueUrlResponse {
Expand Down Expand Up @@ -194,8 +194,8 @@ message ReceiveMessageResponse {
message Message {
map<string, string> attributes = 1;
string body = 2;
string md5_of_body = 3;
string md5_of_message_attributes = 4;
string m_d_5_of_body = 3;
string m_d_5_of_message_attributes = 4;
map<string, MessageAttribute> message_attributes = 5;
string message_id = 6;
string receipt_handle = 7;
Expand All @@ -221,9 +221,9 @@ message SendMessageResponse {
}

message SendMessageResult {
string md5_of_message_attributes = 1;
string md5_of_message_body= 2;
string md5_of_message_system_attributes= 3;
string m_d_5_of_message_attributes = 1;
string m_d_5_of_message_body= 2;
string m_d_5_of_message_system_attributes= 3;
string message_id = 4;
string sequence_number = 5;
}
Expand All @@ -248,10 +248,10 @@ message SendMessageBatchRequestEntry {

message SendMessageBatchResultEntry {
string id = 1;
string md5_of_message_body = 2;
string m_d_5_of_message_body = 2;
string message_id = 3;
string md5_of_message_attributes = 4;
string md5_of_message_system_attributes = 5;
string m_d_5_of_message_attributes = 4;
string m_d_5_of_message_system_attributes = 5;
string sequence_number = 6;
}

Expand Down
114 changes: 40 additions & 74 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ namespace NKikimr::NYmq::V1 {

Ydb::Ymq::V1::SendMessageResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
Ydb::Ymq::V1::SendMessageResult result;
result.set_md5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
result.set_md5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
result.set_m_d_5_of_message_attributes(GetResponse(resp).GetMD5OfMessageAttributes());
result.set_m_d_5_of_message_body(GetResponse(resp).GetMD5OfMessageBody());
result.set_message_id(GetResponse(resp).GetMessageId());
result.set_sequence_number(std::to_string(GetResponse(resp).GetSequenceNumber()));
return result;
Expand Down Expand Up @@ -347,8 +347,8 @@ namespace NKikimr::NYmq::V1 {
}

dstMessage.set_body(srcMessage.GetData());
dstMessage.set_md5_of_body(srcMessage.GetMD5OfMessageBody());
dstMessage.set_md5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());
dstMessage.set_m_d_5_of_body(srcMessage.GetMD5OfMessageBody());
dstMessage.set_m_d_5_of_message_attributes(srcMessage.GetMD5OfMessageAttributes());

for (const auto& srcAttribute: srcMessage.GetMessageAttributes()) {
Ydb::Ymq::V1::MessageAttribute dstAttribute;
Expand Down Expand Up @@ -448,75 +448,41 @@ namespace NKikimr::NYmq::V1 {

Ydb::Ymq::V1::GetQueueAttributesResult GetResult(const NKikimrClient::TSqsResponse& resp) override {
Ydb::Ymq::V1::GetQueueAttributesResult result;
for (const auto& attributeName : Attributes) {
if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES) {
AddAttribute(
result,
APPROXIMATE_NUMBER_OF_MESSAGES,
GetResponse(resp).GetApproximateNumberOfMessages()
);
} else if (attributeName == APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED) {
AddAttribute(
result,
APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED,
GetResponse(resp).GetApproximateNumberOfMessagesDelayed()
);
} else if (attributeName == CREATED_TIMESTAMP) {
AddAttribute(
result,
CREATED_TIMESTAMP,
GetResponse(resp).GetCreatedTimestamp()
);
} else if (attributeName == DELAY_SECONDS) {
AddAttribute(
result,
DELAY_SECONDS,
GetResponse(resp).GetDelaySeconds()
);
} else if (attributeName == LAST_MODIFIED_TIMESTAMP) {
AddAttribute(
result,
LAST_MODIFIED_TIMESTAMP,
GetResponse(resp).GetLastModifiedTimestamp()
);
} else if (attributeName == MAXIMUM_MESSAGE_SIZE) {
AddAttribute(
result,
MAXIMUM_MESSAGE_SIZE,
GetResponse(resp).GetMaximumMessageSize()
);
} else if (attributeName == MESSAGE_RETENTION_PERIOD) {
AddAttribute(
result,
MESSAGE_RETENTION_PERIOD,
GetResponse(resp).GetMessageRetentionPeriod()
);
} else if (attributeName == QUEUE_ARN) {
AddAttribute(
result,
QUEUE_ARN,
GetResponse(resp).GetQueueArn()
);
} else if (attributeName == RECEIVE_MESSAGE_WAIT_TIME_SECONDS) {
AddAttribute(
result,
RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
GetResponse(resp).GetReceiveMessageWaitTimeSeconds()
);
} else if (attributeName == VISIBILITY_TIMEOUT) {
AddAttribute(
result,
VISIBILITY_TIMEOUT,
GetResponse(resp).GetVisibilityTimeout()
);
} else if (attributeName == REDRIVE_POLICY) {
AddAttribute(
result,
REDRIVE_POLICY,
GetResponse(resp).GetRedrivePolicy()
);
}
const auto& attrs = resp.GetGetQueueAttributes();
if (attrs.HasApproximateNumberOfMessages()) {
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES, attrs.GetApproximateNumberOfMessages());
}
if (attrs.HasApproximateNumberOfMessagesDelayed()) {
AddAttribute(result, APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, attrs.GetApproximateNumberOfMessagesDelayed());
}
if (attrs.HasCreatedTimestamp()) {
AddAttribute(result, CREATED_TIMESTAMP, attrs.GetCreatedTimestamp());
}
if (attrs.HasDelaySeconds()) {
AddAttribute(result, DELAY_SECONDS, attrs.GetDelaySeconds());
}
if (attrs.HasLastModifiedTimestamp()) {
AddAttribute(result, LAST_MODIFIED_TIMESTAMP, attrs.GetLastModifiedTimestamp());
}
if (attrs.HasMaximumMessageSize()) {
AddAttribute(result, MAXIMUM_MESSAGE_SIZE, attrs.GetMaximumMessageSize());
}
if (attrs.HasMessageRetentionPeriod()) {
AddAttribute(result, MESSAGE_RETENTION_PERIOD, attrs.GetMessageRetentionPeriod());
}
if (attrs.HasQueueArn()) {
AddAttribute(result, QUEUE_ARN, attrs.GetQueueArn());
}
if (attrs.HasReceiveMessageWaitTimeSeconds()) {
AddAttribute(result, RECEIVE_MESSAGE_WAIT_TIME_SECONDS, attrs.GetReceiveMessageWaitTimeSeconds());
}
if (attrs.HasVisibilityTimeout()) {
AddAttribute(result, VISIBILITY_TIMEOUT, attrs.GetVisibilityTimeout());
}
if (attrs.HasRedrivePolicy()) {
AddAttribute(result, REDRIVE_POLICY, attrs.GetRedrivePolicy());
}

return result;
}

Expand Down Expand Up @@ -819,8 +785,8 @@ namespace NKikimr::NYmq::V1 {
} else {
auto currentSuccessful = result.Addsuccessful();
currentSuccessful->Setid(entry.GetId());
currentSuccessful->Setmd5_of_message_attributes(entry.GetMD5OfMessageAttributes());
currentSuccessful->Setmd5_of_message_body(entry.GetMD5OfMessageBody());
currentSuccessful->set_m_d_5_of_message_attributes(entry.GetMD5OfMessageAttributes());
currentSuccessful->set_m_d_5_of_message_body(entry.GetMD5OfMessageBody());
currentSuccessful->Setmessage_id(entry.GetMessageId());
currentSuccessful->Setsequence_number(std::to_string(entry.GetSequenceNumber()));
}
Expand Down

0 comments on commit 685988d

Please sign in to comment.