Skip to content

Commit

Permalink
Merge c9b101f into 53eadb0
Browse files Browse the repository at this point in the history
  • Loading branch information
siarheivesialou authored Jun 13, 2024
2 parents 53eadb0 + c9b101f commit a9cc23f
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 27 deletions.
24 changes: 13 additions & 11 deletions ydb/core/ymq/actor/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ class TActionActor
if (TProxyActor::NeedCreateProxyActor(Action_)) {
configurationFlags |= TSqsEvents::TEvGetConfiguration::EFlags::NeedQueueLeader;
}
bool enableThrottling = (Action_ != EAction::CreateQueue);
this->Send(MakeSqsServiceID(this->SelfId().NodeId()),
MakeHolder<TSqsEvents::TEvGetConfiguration>(
RequestId_,
UserName_,
GetQueueName(),
FolderId_,
enableThrottling,
configurationFlags)
);
}
Expand Down Expand Up @@ -637,17 +640,16 @@ class TActionActor
return;
}

if (TDerived::NeedExistingQueue()) {
if (ev->Get()->Throttled) {
MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue.");
SendReplyAndDie();
return;
}
if (!ev->Get()->QueueExists) {
MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE);
SendReplyAndDie();
return;
}
if (ev->Get()->Throttled) {
MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue.");
SendReplyAndDie();
return;
}

if (TDerived::NeedExistingQueue() && !ev->Get()->QueueExists) {
MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE);
SendReplyAndDie();
return;
}

Y_ABORT_UNLESS(SchemeCache_);
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/ymq/actor/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ struct TSqsEvents {
TString RequestId;
TString UserName;
TString QueueName;
TString FolderId;
bool EnableThrottling = true;
ui64 Flags = 0;

enum EFlags {
Expand All @@ -175,6 +177,20 @@ struct TSqsEvents {
, QueueName(name)
, Flags(flags)
{ }
TEvGetConfiguration(
TString requestId,
const TString& user,
const TString& name,
const TString& folderId,
bool enableThrottling,
ui64 flags = 0
) : RequestId(std::move(requestId))
, UserName(user)
, QueueName(name)
, FolderId(folderId)
, EnableThrottling(enableThrottling)
, Flags(flags)
{ }
};

struct TQuoterResourcesForActions : public TAtomicRefCount<TQuoterResourcesForActions> {
Expand Down
28 changes: 19 additions & 9 deletions ydb/core/ymq/actor/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,18 +579,28 @@ void TSqsService::HandleGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr&
}

const auto queueIt = user->Queues_.find(queueName);
if (queueIt == user->Queues_.end()) {
if (RequestQueueListForUser(user, reqId)) {
LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader());
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
user->GetConfigurationRequests_.emplace(queueName, std::move(ev));
} else {
AnswerThrottled(ev);
}
if (queueIt != user->Queues_.end()) {
ProcessConfigurationRequestForQueue(ev, user, queueIt->second);
return;
} else if (ev->Get()->FolderId) {
const auto byNameAndFolderIt = user->QueueByNameAndFolder_ .find(
std::make_pair(ev->Get()->QueueName, ev->Get()->FolderId)
);
if (byNameAndFolderIt != user->QueueByNameAndFolder_.end()) {
ProcessConfigurationRequestForQueue(ev, user, byNameAndFolderIt->second);
return;
}
}

ProcessConfigurationRequestForQueue(ev, user, queueIt->second);
if (RequestQueueListForUser(user, reqId)) {
LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader());
RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list");
user->GetConfigurationRequests_.emplace(queueName, std::move(ev));
} else if (ev->Get()->EnableThrottling) {
AnswerThrottled(ev);
} else {
AnswerNotExists(ev, user);
}
}

void TSqsService::AnswerNotExists(TSqsEvents::TEvGetConfiguration::TPtr& ev, const TUserInfoPtr& userInfo) {
Expand Down
7 changes: 2 additions & 5 deletions ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,5 @@ def get_attributes_of_nonexistent_queue():
)
)

# Check that getting queue url with custom name still works
assert_that(
lambda: self._sqs_api.get_queue_url(custom_queue_name),
not_(raises(RuntimeError))
)
received_queue_url = self._sqs_api.get_queue_url(custom_queue_name)
assert received_queue_url == queue_url
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from hamcrest import assert_that, raises
from hamcrest import assert_that, raises, not_

from ydb.tests.library.sqs.test_base import KikimrSqsTestBase

Expand Down Expand Up @@ -68,3 +68,22 @@ def get_attributes_of_nonexistent_queue():
pattern=throttling_exception_pattern
)
)

def test_that_queue_can_be_created_despite_lack_of_throttling_budget(self):
queue_url = self._create_queue_and_assert(self.queue_name, False, True)
nonexistent_queue_url = queue_url + "_nonex"

def get_attributes_of_nonexistent_queue():
self._sqs_api.get_queue_attributes(nonexistent_queue_url)

# Draining budget
for _ in range(16):
try:
get_attributes_of_nonexistent_queue()
except Exception:
pass

assert_that(
lambda: self._create_queue_and_assert("other_queue_name", False, True),
not_(raises(RuntimeError))
)
2 changes: 1 addition & 1 deletion ydb/tests/functional/sqs/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ TEST_SRCS(
test_queue_attributes_validation.py
test_queues_managing.py
test_format_without_version.py
test_throttling_nonexistent_queue.py
test_throttling.py
test_queue_counters.py
)

Expand Down

0 comments on commit a9cc23f

Please sign in to comment.