From a943835d77975ebe9c479d8ecba5617146594edd Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Thu, 28 Dec 2023 21:22:27 +0300 Subject: [PATCH] Introduce strategy test KIKIMR-20527 (#779) --- ydb/core/blobstorage/dsproxy/log_acc.h | 4 + .../dsproxy/ut_strategy/strategy_ut.cpp | 202 ++++++++++++++++++ .../blobstorage/dsproxy/ut_strategy/ya.make | 16 ++ ydb/core/blobstorage/dsproxy/ya.make | 1 + 4 files changed, 223 insertions(+) create mode 100644 ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp create mode 100644 ydb/core/blobstorage/dsproxy/ut_strategy/ya.make diff --git a/ydb/core/blobstorage/dsproxy/log_acc.h b/ydb/core/blobstorage/dsproxy/log_acc.h index f91d310d5480..d29506d0527f 100644 --- a/ydb/core/blobstorage/dsproxy/log_acc.h +++ b/ydb/core/blobstorage/dsproxy/log_acc.h @@ -73,6 +73,7 @@ namespace NKikimr { const TString RequestPrefix; const NKikimrServices::EServiceKikimr LogComponent; TLogAccumulator LogAcc; + bool SuppressLog = false; TLogContext(NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled) : RequestPrefix(Sprintf("[%016" PRIx64 "]", TAppData::RandomProvider->GenRand64())) @@ -142,6 +143,9 @@ namespace NKikimr { #define A_LOG_LOG_SX(logCtx, isRelease, priority, marker, stream) \ do { \ auto& lc = (logCtx); \ + if (lc.SuppressLog) { \ + break; \ + } \ A_LOG_LOG_S_IMPL(isRelease, lc.LogAcc, priority, lc.LogComponent, \ lc.RequestPrefix << " " << stream << " Marker# " << marker); \ } while (false) diff --git a/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp new file mode 100644 index 000000000000..9d7a548dfcdc --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/ut_strategy/strategy_ut.cpp @@ -0,0 +1,202 @@ +#include +#include +#include +#include + +using namespace NActors; +using namespace NKikimr; + +#define Ctest Cnull + +class TGroupModel { + TBlobStorageGroupInfo& Info; + + struct TNotYet {}; + + struct TDiskState { + bool InErrorState = false; + std::map> Blobs; + }; + + std::vector DiskStates; + +public: + TGroupModel(TBlobStorageGroupInfo& info) + : Info(info) + , DiskStates(Info.GetTotalVDisksNum()) + { + for (auto& disk : DiskStates) { + disk.InErrorState = RandomNumber(2 * DiskStates.size()) == 0; + } + } + + TBlobStorageGroupInfo::TGroupVDisks GetFailedDisks() const { + TBlobStorageGroupInfo::TGroupVDisks res = &Info.GetTopology(); + for (ui32 i = 0; i < DiskStates.size(); ++i) { + if (DiskStates[i].InErrorState) { + res |= {&Info.GetTopology(), Info.GetVDiskId(i)}; + } + } + return res; + } + + void ProcessBlackboardRequests(TBlackboard& blackboard) { + for (ui32 i = 0; i < blackboard.GroupDiskRequests.DiskRequestsForOrderNumber.size(); ++i) { + auto& r = blackboard.GroupDiskRequests.DiskRequestsForOrderNumber[i]; + Y_ABORT_UNLESS(i < DiskStates.size()); + auto& disk = DiskStates[i]; + for (auto& get : r.GetsToSend) { + Ctest << "orderNumber# " << i << " get Id# " << get.Id; + if (disk.InErrorState) { + Ctest << " ERROR"; + blackboard.AddErrorResponse(get.Id, i); + } else if (auto it = disk.Blobs.find(get.Id); it == disk.Blobs.end()) { + Ctest << " NODATA"; + blackboard.AddNoDataResponse(get.Id, i); + } else { + std::visit(TOverloaded{ + [&](TNotYet&) { + Ctest << " NOT_YET"; + blackboard.AddNotYetResponse(get.Id, i); + }, + [&](TRope& buffer) { + Ctest << " OK"; + size_t begin = Min(get.Shift, buffer.size()); + size_t end = Min(buffer.size(), begin + get.Size); + TRope data(buffer.begin() + begin, buffer.begin() + end); + blackboard.AddResponseData(get.Id, i, get.Shift, std::move(data)); + } + }, it->second); + } + Ctest << Endl; + } + r.GetsToSend.clear(); + for (auto& put : r.PutsToSend) { + Ctest << "orderNumber# " << i << " put Id# " << put.Id; + if (disk.InErrorState) { + Ctest << " ERROR"; + blackboard.AddErrorResponse(put.Id, i); + } else { + Ctest << " OK"; + disk.Blobs[put.Id] = std::move(put.Buffer); + blackboard.AddPutOkResponse(put.Id, i); + } + Ctest << Endl; + } + r.PutsToSend.clear(); + } + } +}; + +template +void RunStrategyTest(TBlobStorageGroupType type) { + TBlobStorageGroupInfo info(type); + info.Ref(); + TGroupQueues groupQueues(info.GetTopology()); + groupQueues.Ref(); + + std::unordered_map> transitions; + + for (ui32 iter = 0; iter < 1'000'000; ++iter) { + Ctest << "iteration# " << iter << Endl; + + TBlackboard blackboard(&info, &groupQueues, NKikimrBlobStorage::UserData, NKikimrBlobStorage::FastRead); + TString data(1000, 'x'); + TLogoBlobID id(1'000'000'000, 1, 1, 0, data.size(), 0); + std::vector parts(type.TotalPartCount()); + ErasureSplit(TBlobStorageGroupType::CrcModeNone, type, TRope(data), parts); + blackboard.RegisterBlobForPut(id); + for (ui32 i = 0; i < parts.size(); ++i) { + blackboard.AddPartToPut(id, i, TRope(parts[i])); + } + blackboard[id].Whole.Data.Write(0, TRope(data)); + + TLogContext logCtx(NKikimrServices::BS_PROXY, false); + logCtx.SuppressLog = true; + + TGroupModel model(info); + + auto sureFailedDisks = model.GetFailedDisks(); + auto failedDisks = sureFailedDisks; + + auto& state = blackboard[id]; + for (ui32 idxInSubgroup = 0; idxInSubgroup < type.BlobSubgroupSize(); ++idxInSubgroup) { + for (ui32 partIdx = 0; partIdx < type.TotalPartCount(); ++partIdx) { + if (!type.PartFits(partIdx + 1, idxInSubgroup)) { + continue; + } + const ui32 orderNumber = state.Disks[idxInSubgroup].OrderNumber; + const TLogoBlobID partId(id, partIdx + 1); + auto& item = state.Disks[idxInSubgroup].DiskParts[partIdx]; + TBlobStorageGroupInfo::TGroupVDisks diskMask = {&info.GetTopology(), info.GetVDiskId(orderNumber)}; + if (sureFailedDisks & diskMask) { + if (RandomNumber(5u) == 0) { + blackboard.AddErrorResponse(partId, orderNumber); + } + } else { + switch (RandomNumber(100u)) { + case 0: + blackboard.AddErrorResponse(partId, orderNumber); + break; + + case 1: + blackboard.AddNoDataResponse(partId, orderNumber); + break; + + case 2: + blackboard.AddNotYetResponse(partId, orderNumber); + break; + + case 3: + blackboard.AddResponseData(partId, orderNumber, 0, TRope(parts[partIdx])); + break; + } + } + if (item.Situation == TBlobState::ESituation::Error) { + failedDisks |= diskMask; + } + } + } + + Ctest << "initial state# " << state.ToString() << Endl; + + for (;;) { + T strategy; + + TString state = blackboard[id].ToString(); + + auto outcome = blackboard.RunStrategy(logCtx, strategy); + + TString nextState = blackboard[id].ToString(); + if (const auto [it, inserted] = transitions.try_emplace(state, std::make_tuple(outcome, nextState)); !inserted) { + Y_ABORT_UNLESS(it->second == std::make_tuple(outcome, nextState)); + } + + if (outcome == EStrategyOutcome::IN_PROGRESS) { + auto temp = blackboard.RunStrategy(logCtx, strategy); + UNIT_ASSERT_EQUAL(temp, outcome); + UNIT_ASSERT_VALUES_EQUAL(blackboard[id].ToString(), nextState); + } + + if (outcome == EStrategyOutcome::DONE) { + Y_ABORT_UNLESS(info.GetQuorumChecker().CheckFailModelForGroup(sureFailedDisks)); + break; + } else if (outcome == EStrategyOutcome::ERROR) { + Y_ABORT_UNLESS(!info.GetQuorumChecker().CheckFailModelForGroup(failedDisks)); + break; + } else if (outcome != EStrategyOutcome::IN_PROGRESS) { + Y_ABORT("unexpected EStrategyOutcome"); + } + + model.ProcessBlackboardRequests(blackboard); + } + } +} + +Y_UNIT_TEST_SUITE(DSProxyStrategyTest) { + + Y_UNIT_TEST(Restore_block42) { + RunStrategyTest(TBlobStorageGroupType::Erasure4Plus2Block); + } + +} diff --git a/ydb/core/blobstorage/dsproxy/ut_strategy/ya.make b/ydb/core/blobstorage/dsproxy/ut_strategy/ya.make new file mode 100644 index 000000000000..a307788274d6 --- /dev/null +++ b/ydb/core/blobstorage/dsproxy/ut_strategy/ya.make @@ -0,0 +1,16 @@ +UNITTEST() + +TIMEOUT(600) +SIZE(MEDIUM) + +PEERDIR( + ydb/core/blobstorage/dsproxy +) + +YQL_LAST_ABI_VERSION() + +SRCS( + strategy_ut.cpp +) + +END() diff --git a/ydb/core/blobstorage/dsproxy/ya.make b/ydb/core/blobstorage/dsproxy/ya.make index e084ea456974..c4096cbdd681 100644 --- a/ydb/core/blobstorage/dsproxy/ya.make +++ b/ydb/core/blobstorage/dsproxy/ya.make @@ -79,4 +79,5 @@ RECURSE_FOR_TESTS( ut ut_fat ut_ftol + ut_strategy )