From d227d319db0ef6712d192df20f9ac220ceacc560 Mon Sep 17 00:00:00 2001 From: Alek5andr-Kotov <152866892+Alek5andr-Kotov@users.noreply.github.com> Date: Tue, 26 Mar 2024 12:02:56 +0300 Subject: [PATCH] duplicated code in tests (#3149) --- ydb/core/persqueue/ut/partition_ut.cpp | 182 +++++++++---------------- 1 file changed, 67 insertions(+), 115 deletions(-) diff --git a/ydb/core/persqueue/ut/partition_ut.cpp b/ydb/core/persqueue/ut/partition_ut.cpp index 4ccb0b75f781..681673d714e6 100644 --- a/ydb/core/persqueue/ut/partition_ut.cpp +++ b/ydb/core/persqueue/ut/partition_ut.cpp @@ -263,6 +263,8 @@ class TPartitionFixture : public NUnitTest::TBaseFixture { void SendGetWriteInfo(ui32 internalPartitionId); void ShadowPartitionCountersTest(bool isFirstClass); + void TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline); + TMaybe Ctx; TMaybe Finalizer; @@ -1042,6 +1044,69 @@ void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) { } +void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration, bool ignoreQuotaDeadline) +{ + Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); + Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(quotaWaitDuration.MilliSeconds()); + Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); + + CreatePartition({ + .Partition=TPartitionId{1}, + .Begin=0, .End=10, + // + // partition configuration + // + .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} + }, + // + // tablet configuration + // + {.Version=2, .Consumers={{.Consumer="client-1"}}}); + + SendSubDomainStatus(true); + + ui64 cookie = 1; + ui64 messageNo = 0; + + SendChangeOwner(cookie, "owner1", Ctx->Edge, true); + auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); + UNIT_ASSERT(ownerEvent != nullptr); + auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); + + TAutoPtr handle; + std::function truth = [&](const TEvPQ::TEvProxyResponse& e) { + return cookie == e.Cookie; + }; + + TString data = "data for write"; + + // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline); + messageNo++; + + SendDiskStatusResponse(); + { + auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + } + + // Second message will not be processed because the limit is exceeded. + SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, ignoreQuotaDeadline); + messageNo++; + + SendDiskStatusResponse(); + auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event == nullptr); + + // SudDomain quota available - second message will be processed.. + SendSubDomainStatus(false); + SendDiskStatusResponse(); + + event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); + UNIT_ASSERT(event != nullptr); + UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus()); +} + Y_UNIT_TEST_F(Batching, TPartitionFixture) { CreatePartition(); @@ -1666,125 +1731,12 @@ Y_UNIT_TEST_F(WriteSubDomainOutOfSpace, TPartitionFixture) Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_DisableExpiration, TPartitionFixture) { - Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); - // disable write request expiration while thes wait quota - Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(0); - Ctx->Runtime->SetLogPriority( NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); - - CreatePartition({ - .Partition=TPartitionId{1}, - .Begin=0, .End=10, - // - // partition configuration - // - .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} - }, - // - // tablet configuration - // - {.Version=2, .Consumers={{.Consumer="client-1"}}}); - - SendSubDomainStatus(true); - - ui64 cookie = 1; - ui64 messageNo = 0; - - SendChangeOwner(cookie, "owner1", Ctx->Edge, true); - auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); - UNIT_ASSERT(ownerEvent != nullptr); - auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); - - TAutoPtr handle; - std::function truth = [&](const TEvPQ::TEvProxyResponse& e) { - return cookie == e.Cookie; - }; - - TString data = "data for write"; - - // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); - messageNo++; - - SendDiskStatusResponse(); - { - auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - } - - // Second message will not be processed because the limit is exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data); - messageNo++; - - SendDiskStatusResponse(); - auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event == nullptr); - - // SudDomain quota available - second message will be processed.. - SendSubDomainStatus(false); - SendDiskStatusResponse(); - - event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus()); + TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(0), false); } Y_UNIT_TEST_F(WriteSubDomainOutOfSpace_IgnoreQuotaDeadline, TPartitionFixture) { - Ctx->Runtime->GetAppData().FeatureFlags.SetEnableTopicDiskSubDomainQuota(true); - Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetQuotaWaitDurationMs(300); - - CreatePartition({ - .Partition=TPartitionId{1}, - .Begin=0, .End=10, - // - // partition configuration - // - .Config={.Version=1, .Consumers={{.Consumer="client-1", .Offset=3}}} - }, - // - // tablet configuration - // - {.Version=2, .Consumers={{.Consumer="client-1"}}}); - - SendSubDomainStatus(true); - - ui64 cookie = 1; - ui64 messageNo = 0; - - SendChangeOwner(cookie, "owner1", Ctx->Edge, true); - auto ownerEvent = Ctx->Runtime->GrabEdgeEvent(TDuration::Seconds(1)); - UNIT_ASSERT(ownerEvent != nullptr); - auto ownerCookie = ownerEvent->Response->GetPartitionResponse().GetCmdGetOwnershipResult().GetOwnerCookie(); - - TAutoPtr handle; - std::function truth = [&](const TEvPQ::TEvProxyResponse& e) { return cookie == e.Cookie; }; - - TString data = "data for write"; - - // First message will be processed because used storage 0 and limit 0. That is, the limit is not exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true); - messageNo++; - SendDiskStatusResponse(); - { - auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - } - - // Second message will not be processed because the limit is exceeded. - SendWrite(++cookie, messageNo, ownerCookie, (messageNo + 1) * 100, data, true); - messageNo++; - - SendDiskStatusResponse(); - auto event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event == nullptr); - - // SudDomain quota available - second message will be processed.. - SendSubDomainStatus(false); - SendDiskStatusResponse(); - - event = Ctx->Runtime->GrabEdgeEventIf(handle, truth, TDuration::Seconds(1)); - UNIT_ASSERT(event != nullptr); - UNIT_ASSERT_EQUAL(NMsgBusProxy::MSTATUS_OK, event->Response->GetStatus()); + TestWriteSubDomainOutOfSpace(TDuration::MilliSeconds(300), true); } Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {