diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index 54fe15aca653..5e944d63e16e 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -312,6 +312,7 @@ class TGrpcRequestCheckActor SetTokenAndDie(); break; case Ydb::StatusIds::TIMEOUT: + case Ydb::StatusIds::CANCELLED: Counters_->IncDatabaseRateLimitedCounter(); LOG_INFO(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Throughput limit exceeded"); ReplyOverloadedAndDie(MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED, "Throughput limit exceeded")); @@ -331,7 +332,8 @@ class TGrpcRequestCheckActor } }; - req.mutable_operation_params()->mutable_operation_timeout()->set_nanos(200000000); // same as cloud-go serverless proxy + req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(10); + req.mutable_operation_params()->mutable_cancel_after()->set_nanos(200000000); // same as cloud-go serverless proxy NKikimr::NRpcService::RateLimiterAcquireUseSameMailbox( std::move(req), diff --git a/ydb/core/grpc_services/local_rate_limiter.cpp b/ydb/core/grpc_services/local_rate_limiter.cpp index d77fd26f9845..993c71e246e8 100644 --- a/ydb/core/grpc_services/local_rate_limiter.cpp +++ b/ydb/core/grpc_services/local_rate_limiter.cpp @@ -23,6 +23,7 @@ TActorId RateLimiterAcquireUseSameMailbox( onSuccess(); break; case Ydb::StatusIds::TIMEOUT: + case Ydb::StatusIds::CANCELLED: onTimeout(); break; default: @@ -32,7 +33,8 @@ TActorId RateLimiterAcquireUseSameMailbox( }; Ydb::RateLimiter::AcquireResourceRequest request; - SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout()); + SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout()); + SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after()); request.set_coordination_node_path(fullPath.CoordinationNode); request.set_resource_path(fullPath.ResourcePath); request.set_required(required); @@ -72,6 +74,7 @@ TActorId RateLimiterAcquireUseSameMailbox( onSuccess(); break; case Ydb::StatusIds::TIMEOUT: + case Ydb::StatusIds::CANCELLED: onTimeout(); break; default: @@ -82,7 +85,8 @@ TActorId RateLimiterAcquireUseSameMailbox( const auto& rlPath = maybeRlPath.GetRef(); Ydb::RateLimiter::AcquireResourceRequest request; - SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout()); + SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout()); + SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after()); request.set_coordination_node_path(rlPath.CoordinationNode); request.set_resource_path(rlPath.ResourcePath); request.set_required(required); diff --git a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp index 288527ae1e95..4ce11d417c4d 100644 --- a/ydb/core/grpc_services/rpc_rate_limiter_api.cpp +++ b/ydb/core/grpc_services/rpc_rate_limiter_api.cpp @@ -594,11 +594,18 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequestcoordination_node_path(), GetProtoRequest()->resource_path()), 0, 0); TBase::OnOperationTimeout(ctx); } + // Do nothing here, because quoter service replies after "cancel after" time passes. + void OnCancelOperation(const TActorContext& ctx) { + Y_UNUSED(ctx); + } + STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { hFunc(TEvQuota::TEvClearance, Handle); @@ -637,22 +644,37 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequestGet()->Result) { case TEvQuota::TEvClearance::EResult::Success: Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext()); - break; + break; case TEvQuota::TEvClearance::EResult::UnknownResource: Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext()); - break; + break; case TEvQuota::TEvClearance::EResult::Deadline: - Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext()); - break; + Reply(QuoterDeadlineStatusCode(), TActivationContext::AsActorContext()); + break; default: Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext()); } diff --git a/ydb/public/api/protos/ydb_rate_limiter.proto b/ydb/public/api/protos/ydb_rate_limiter.proto index a3aa37ae4420..2f415b8b1fcd 100644 --- a/ydb/public/api/protos/ydb_rate_limiter.proto +++ b/ydb/public/api/protos/ydb_rate_limiter.proto @@ -266,6 +266,11 @@ message DescribeResourceResult { // message AcquireResourceRequest { + // If cancel_after is set greater than zero and less than operation_timeout + // and resource is not ready after cancel_after time, + // the result code of this operation will be CANCELLED and resource will not be spent. + // It is recommended to specify both operation_timeout and cancel_after. + // cancel_after should be less than operation_timeout and non zero. Ydb.Operations.OperationParams operation_params = 1; // Path of a coordination node. diff --git a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h index dca90fbe639d..a65936900c3d 100644 --- a/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h +++ b/ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h @@ -161,6 +161,11 @@ class TRateLimiterClient { TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {}); // Acquire resources's units inside a coordination node. + // If CancelAfter is set greater than zero and less than OperationTimeout + // and resource is not ready after CancelAfter time, + // the result code of this operation will be CANCELLED and resource will not be spent. + // It is recommended to specify both OperationTimeout and CancelAfter. + // CancelAfter should be less than OperationTimeout. TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {}); private: diff --git a/ydb/services/rate_limiter/rate_limiter_ut.cpp b/ydb/services/rate_limiter/rate_limiter_ut.cpp index 4ac05a571c30..3a0a43a318d9 100644 --- a/ydb/services/rate_limiter/rate_limiter_ut.cpp +++ b/ydb/services/rate_limiter/rate_limiter_ut.cpp @@ -108,6 +108,9 @@ class TTestSetupAcquireActor : public TTestSetup { request.set_resource_path(ResourcePath); SetDuration(Settings.OperationTimeout_, *request.mutable_operation_params()->mutable_operation_timeout()); + if (Settings.CancelAfter_) { + SetDuration(Settings.CancelAfter_, *request.mutable_operation_params()->mutable_cancel_after()); + } if (Settings.IsUsedAmount_) { request.set_used(Settings.Amount_.GetRef()); @@ -317,7 +320,10 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) { return std::make_unique(); } - void AcquireResourceManyRequired(bool useActorApi) { + void AcquireResourceManyRequired(bool useActorApi, bool useCancelAfter) { + const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200); + const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set + using NYdb::NRateLimiter::TAcquireResourceSettings; auto setup = MakeTestSetup(useActorApi); @@ -325,42 +331,61 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) { ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res", TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS); for (int i = 0; i < 3; ++i) { - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT); - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS); } } - void AcquireResourceManyUsed(bool useActorApi) { + void AcquireResourceManyUsed(bool useActorApi, bool useCancelAfter) { + const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200); + const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set + using NYdb::NRateLimiter::TAcquireResourceSettings; auto setup = MakeTestSetup(useActorApi); ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res", TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42))); - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS); for (int i = 0; i < 3; ++i) { - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT); - setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT); + setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS); } } Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) { - AcquireResourceManyRequired(false); + AcquireResourceManyRequired(false, false); } Y_UNIT_TEST(AcquireResourceManyRequiredActorApi) { - AcquireResourceManyRequired(true); + AcquireResourceManyRequired(true, false); + } + + Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApiWithCancelAfter) { + AcquireResourceManyRequired(false, true); + } + + Y_UNIT_TEST(AcquireResourceManyRequiredActorApiWithCancelAfter) { + AcquireResourceManyRequired(true, true); } Y_UNIT_TEST(AcquireResourceManyUsedGrpcApi) { - AcquireResourceManyUsed(false); + AcquireResourceManyUsed(false, false); } Y_UNIT_TEST(AcquireResourceManyUsedActorApi) { - AcquireResourceManyUsed(true); + AcquireResourceManyUsed(true, false); + } + + Y_UNIT_TEST(AcquireResourceManyUsedGrpcApiWithCancelAfter) { + AcquireResourceManyUsed(false, true); + } + + Y_UNIT_TEST(AcquireResourceManyUsedActorApiWithCancelAfter) { + AcquireResourceManyUsed(true, true); } }