From c9b101fa7801d0addcd7dc3bfad038d1eb33dd9a Mon Sep 17 00:00:00 2001 From: Sergey Veselov Date: Thu, 13 Jun 2024 11:41:35 +0300 Subject: [PATCH] SQS-785: correct response to GetQueueUrl with custom queue name when throttling (#5439) --- ydb/core/ymq/actor/action.h | 24 ++++++++-------- ydb/core/ymq/actor/events.h | 16 +++++++++++ ydb/core/ymq/actor/service.cpp | 28 +++++++++++++------ .../sqs/cloud/test_yandex_cloud_mode.py | 7 ++--- ...onexistent_queue.py => test_throttling.py} | 21 +++++++++++++- ydb/tests/functional/sqs/common/ya.make | 2 +- 6 files changed, 71 insertions(+), 27 deletions(-) rename ydb/tests/functional/sqs/common/{test_throttling_nonexistent_queue.py => test_throttling.py} (74%) diff --git a/ydb/core/ymq/actor/action.h b/ydb/core/ymq/actor/action.h index 7efa47407050..b77c9f3a7d9f 100644 --- a/ydb/core/ymq/actor/action.h +++ b/ydb/core/ymq/actor/action.h @@ -77,11 +77,14 @@ class TActionActor if (TProxyActor::NeedCreateProxyActor(Action_)) { configurationFlags |= TSqsEvents::TEvGetConfiguration::EFlags::NeedQueueLeader; } + bool enableThrottling = (Action_ != EAction::CreateQueue); this->Send(MakeSqsServiceID(this->SelfId().NodeId()), MakeHolder( RequestId_, UserName_, GetQueueName(), + FolderId_, + enableThrottling, configurationFlags) ); } @@ -637,17 +640,16 @@ class TActionActor return; } - if (TDerived::NeedExistingQueue()) { - if (ev->Get()->Throttled) { - MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue."); - SendReplyAndDie(); - return; - } - if (!ev->Get()->QueueExists) { - MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE); - SendReplyAndDie(); - return; - } + if (ev->Get()->Throttled) { + MakeError(MutableErrorDesc(), NErrors::THROTTLING_EXCEPTION, "Too many requests for nonexistent queue."); + SendReplyAndDie(); + return; + } + + if (TDerived::NeedExistingQueue() && !ev->Get()->QueueExists) { + MakeError(MutableErrorDesc(), NErrors::NON_EXISTENT_QUEUE); + SendReplyAndDie(); + return; } Y_ABORT_UNLESS(SchemeCache_); diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index f8b1c2ca69a5..deef5ba29be8 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -160,6 +160,8 @@ struct TSqsEvents { TString RequestId; TString UserName; TString QueueName; + TString FolderId; + bool EnableThrottling = true; ui64 Flags = 0; enum EFlags { @@ -175,6 +177,20 @@ struct TSqsEvents { , QueueName(name) , Flags(flags) { } + TEvGetConfiguration( + TString requestId, + const TString& user, + const TString& name, + const TString& folderId, + bool enableThrottling, + ui64 flags = 0 + ) : RequestId(std::move(requestId)) + , UserName(user) + , QueueName(name) + , FolderId(folderId) + , EnableThrottling(enableThrottling) + , Flags(flags) + { } }; struct TQuoterResourcesForActions : public TAtomicRefCount { diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index 9181d88d9b64..5bb74f46c75d 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -579,18 +579,28 @@ void TSqsService::HandleGetConfiguration(TSqsEvents::TEvGetConfiguration::TPtr& } const auto queueIt = user->Queues_.find(queueName); - if (queueIt == user->Queues_.end()) { - if (RequestQueueListForUser(user, reqId)) { - LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader()); - RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list"); - user->GetConfigurationRequests_.emplace(queueName, std::move(ev)); - } else { - AnswerThrottled(ev); - } + if (queueIt != user->Queues_.end()) { + ProcessConfigurationRequestForQueue(ev, user, queueIt->second); return; + } else if (ev->Get()->FolderId) { + const auto byNameAndFolderIt = user->QueueByNameAndFolder_ .find( + std::make_pair(ev->Get()->QueueName, ev->Get()->FolderId) + ); + if (byNameAndFolderIt != user->QueueByNameAndFolder_.end()) { + ProcessConfigurationRequestForQueue(ev, user, byNameAndFolderIt->second); + return; + } } - ProcessConfigurationRequestForQueue(ev, user, queueIt->second); + if (RequestQueueListForUser(user, reqId)) { + LWPROBE(QueueRequestCacheMiss, userName, queueName, reqId, ev->Get()->ToStringHeader()); + RLOG_SQS_REQ_DEBUG(reqId, "Queue [" << userName << "/" << queueName << "] was not found in sqs service list. Requesting queues list"); + user->GetConfigurationRequests_.emplace(queueName, std::move(ev)); + } else if (ev->Get()->EnableThrottling) { + AnswerThrottled(ev); + } else { + AnswerNotExists(ev, user); + } } void TSqsService::AnswerNotExists(TSqsEvents::TEvGetConfiguration::TPtr& ev, const TUserInfoPtr& userInfo) { diff --git a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py index 415fcc54969d..ce34491b0946 100644 --- a/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py +++ b/ydb/tests/functional/sqs/cloud/test_yandex_cloud_mode.py @@ -880,8 +880,5 @@ def get_attributes_of_nonexistent_queue(): ) ) - # Check that getting queue url with custom name still works - assert_that( - lambda: self._sqs_api.get_queue_url(custom_queue_name), - not_(raises(RuntimeError)) - ) + received_queue_url = self._sqs_api.get_queue_url(custom_queue_name) + assert received_queue_url == queue_url diff --git a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py b/ydb/tests/functional/sqs/common/test_throttling.py similarity index 74% rename from ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py rename to ydb/tests/functional/sqs/common/test_throttling.py index 789d653e859b..516f6caebeef 100644 --- a/ydb/tests/functional/sqs/common/test_throttling_nonexistent_queue.py +++ b/ydb/tests/functional/sqs/common/test_throttling.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -from hamcrest import assert_that, raises +from hamcrest import assert_that, raises, not_ from ydb.tests.library.sqs.test_base import KikimrSqsTestBase @@ -68,3 +68,22 @@ def get_attributes_of_nonexistent_queue(): pattern=throttling_exception_pattern ) ) + + def test_that_queue_can_be_created_despite_lack_of_throttling_budget(self): + queue_url = self._create_queue_and_assert(self.queue_name, False, True) + nonexistent_queue_url = queue_url + "_nonex" + + def get_attributes_of_nonexistent_queue(): + self._sqs_api.get_queue_attributes(nonexistent_queue_url) + + # Draining budget + for _ in range(16): + try: + get_attributes_of_nonexistent_queue() + except Exception: + pass + + assert_that( + lambda: self._create_queue_and_assert("other_queue_name", False, True), + not_(raises(RuntimeError)) + ) diff --git a/ydb/tests/functional/sqs/common/ya.make b/ydb/tests/functional/sqs/common/ya.make index 664663fa2294..7fba7c875e3d 100644 --- a/ydb/tests/functional/sqs/common/ya.make +++ b/ydb/tests/functional/sqs/common/ya.make @@ -13,7 +13,7 @@ TEST_SRCS( test_queue_attributes_validation.py test_queues_managing.py test_format_without_version.py - test_throttling_nonexistent_queue.py + test_throttling.py test_queue_counters.py )