Skip to content

Commit

Permalink
KIKIMR-20522: Tests for vdisks balancing (ydb-platform#531)
Browse files Browse the repository at this point in the history
Co-authored-by: robdrynkin <robdrynkin@nebius.com>
  • Loading branch information
robdrynkin and robdrynkin authored Dec 22, 2023
1 parent 86194b9 commit 7116d46
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 1 deletion.
305 changes: 305 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/balancing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h>

#include <library/cpp/iterator/enumerate.h>

#include <util/random/entropy.h>


using TPartsLocations = TVector<TVector<ui8>>;


struct TTestEnv {
TTestEnv(ui32 nodeCount, TBlobStorageGroupType erasure)
: Env({
.NodeCount = nodeCount,
.VDiskReplPausedAtStart = false,
.Erasure = erasure,
})
{
Env.CreateBoxAndPool(1, 1);
Env.Sim(TDuration::Minutes(1));

auto groups = Env.GetGroups();
UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1);
GroupInfo = Env.GetGroupInfo(groups.front());

for (ui32 i = 0; i < Env.Settings.NodeCount; ++i) {
RunningNodes.insert(i);
}
}

static TString PrepareData(const ui32 dataLen, const ui32 start) {
TString data(Reserve(dataLen));
for (ui32 i = 0; i < dataLen; ++i) {
data.push_back('a' + (start + i) % 26);
}
return data;
};

void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) {
const TLogoBlobID id(1, 1, step, 0, data.size(), 0);
Cerr << "SEND TEvPut with key " << id.ToString() << Endl;
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, data, TInstant::Max());
Env.Runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, GroupInfo->GroupID, ev.release());
});
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender, false);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, expectedStatus);
Cerr << "TEvPutResult: " << res->Get()->ToString() << Endl;
};

auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) {
const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0);
Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl;
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
auto ev = std::make_unique<TEvBlobStorage::TEvGet>(
blobId,
/* shift */ 0,
/* size */ dataSize,
TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead,
mustRestoreFirst
);
Env.Runtime->WrapInActorContext(sender, [&] () {
SendToBSProxy(sender, GroupInfo->GroupID, ev.release());
});
TInstant getDeadline = Env.Now() + TDuration::Seconds(30);
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline);
Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl;
return res;
};

TActorId GetQueue(const TVDiskID& vDiskId) {
if (!Queues.contains(vDiskId)) {
Queues[vDiskId] = Env.CreateQueueActor(vDiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000);
}
return Queues[vDiskId];
}

TVector<ui32> GetParts(ui32 position, const TLogoBlobID& blobId) {
if (!RunningNodes.contains(position)) {
return {};
}
auto vDiskId = GroupInfo->GetVDiskId(position);
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery(
vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead,
TEvBlobStorage::TEvVGet::EFlags::None, 0,
{{blobId, 0, 0}}
);
const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__);
TVector<ui32> partsRes;

Cerr << "Get request for vdisk " << position << Endl;
auto queueId = GetQueue(vDiskId);
Env.Runtime->WrapInActorContext(sender, [&] {
Env.Runtime->Send(new IEventHandle(queueId, sender, ev.release()));
});
auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false);
auto parts = res->Get()->Record.GetResult().at(0).GetParts();
partsRes = TVector<ui32>(parts.begin(), parts.end());
return partsRes;
}

TPartsLocations GetExpectedPartsLocations(const TLogoBlobID& blobId) {
TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize());
TBlobStorageGroupInfo::TOrderNums orderNums;
GroupInfo->GetTopology().PickSubgroup(blobId.Hash(), orderNums);
for (ui32 i = 0; i < GroupInfo->GetTopology().GType.TotalPartCount(); ++i) {
result[orderNums[i]].push_back(i + 1);
}
return result;
}

TPartsLocations GetActualPartsLocations(const TLogoBlobID& blobId) {
TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize());
for (ui32 i = 0; i < result.size(); ++i) {
for (ui32 part: GetParts(i, blobId)) {
result[i].push_back(part);
}
Sort(result[i].begin(), result[i].end());
}
return result;
}

bool CheckPartsLocations(const TLogoBlobID& blobId) {
auto expectedParts = GetExpectedPartsLocations(blobId);
auto actualParts = GetActualPartsLocations(blobId);
UNIT_ASSERT_VALUES_EQUAL(expectedParts.size(), actualParts.size());

for (ui32 i = 0; i < expectedParts.size(); ++i) {
UNIT_ASSERT_VALUES_EQUAL(expectedParts[i].size(), actualParts[i].size());
for (ui32 j = 0; j < expectedParts[i].size(); ++j) {
UNIT_ASSERT_VALUES_EQUAL(expectedParts[i][j], actualParts[i][j]);
}
}

return true;
}

void StopNode(ui32 position) {
if (!RunningNodes.contains(position)) {
return;
}
Env.StopNode(GroupInfo->GetActorId(position).NodeId());
RunningNodes.erase(position);
}

void StartNode(ui32 position) {
if (RunningNodes.contains(position)) {
return;
}
Env.StartNode(GroupInfo->GetActorId(position).NodeId());
RunningNodes.insert(position);
for (auto [_, queueId]: Queues) {
Env.Runtime->Send(new IEventHandle(TEvents::TSystem::Poison, 0, queueId, {}, nullptr, 0), queueId.NodeId());
}
Queues.clear();
}

TEnvironmentSetup* operator->() {
return &Env;
}

TEnvironmentSetup Env;
TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo;
THashSet<ui32> RunningNodes;
THashMap<TVDiskID, TActorId> Queues;
};

TLogoBlobID MakeLogoBlobId(ui32 step, ui32 dataSize) {
return TLogoBlobID(1, 1, step, 0, dataSize, 0);
}


TString GenData(ui32 len) {
TString res = TString::Uninitialized(len);
EntropyPool().Read(res.Detach(), res.size());
return res;
}


struct TStopOneNodeTest {
TTestEnv Env;
TString data;

void RunTest() {
ui32 step = 0;

{ // Check just a normal put works
Env.SendPut(++step, data, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
}


{ // Stop one node that should have a part, make put, start it and check that blob would be moved from handoff on main
auto blobId = MakeLogoBlobId(++step, data.size());
auto locations = Env.GetExpectedPartsLocations(blobId);
ui32 nodeIdWithBlob = 0;
while (locations[nodeIdWithBlob].size() == 0) ++nodeIdWithBlob;

Env.StopNode(nodeIdWithBlob);
Env.SendPut(step, data, NKikimrProto::OK);
Env->Sim(TDuration::Seconds(10));
Env.StartNode(nodeIdWithBlob);
Env->Sim(TDuration::Seconds(10));
Env.CheckPartsLocations(MakeLogoBlobId(step, data.size()));
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data);
}
}
};

struct TRandomTest {
TTestEnv Env;
ui32 NumIters;

void RunTest() {
TVector<TString> data(Reserve(NumIters));

for (ui32 step = 0; step < NumIters; ++step) {
Cerr << step << Endl;
data.push_back(GenData(16 + random() % 4096));
auto blobId = MakeLogoBlobId(step, data.back().size());
auto locations = Env.GetExpectedPartsLocations(blobId);

if (random() % 10 == 1 && Env.RunningNodes.size() + 2 > Env->Settings.NodeCount) {
ui32 nodeId = random() % Env->Settings.NodeCount;
Cerr << "Stop node " << nodeId << Endl;
Env.StopNode(nodeId);
Env->Sim(TDuration::Seconds(10));
}

Env.SendPut(step, data.back(), NKikimrProto::OK);

if (random() % 10 == 1) {
for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
if (!Env.RunningNodes.contains(pos)) {
Cerr << "Start node " << pos << Endl;
Env.StartNode(pos);
Env->Sim(TDuration::Seconds(10));
break;
}
}
}

if (random() % 50 == 1) {
ui32 pos = random() % Env->Settings.NodeCount;
if (Env.RunningNodes.contains(pos)) {
Env->CompactVDisk(Env.GroupInfo->GetActorId(pos));
Env->Sim(TDuration::Seconds(10));
}
}

// Wipe random node
if (random() % 100 == 1) {
ui32 pos = random() % Env->Settings.NodeCount;
if (Env.RunningNodes.contains(pos)) {
auto baseConfig = Env->FetchBaseConfig();
const auto& somePDisk = baseConfig.GetPDisk(pos);
const auto& someVSlot = baseConfig.GetVSlot(pos);
Env->Wipe(somePDisk.GetNodeId(), somePDisk.GetPDiskId(), someVSlot.GetVSlotId().GetVSlotId());
Env->Sim(TDuration::Seconds(10));
}
}
}

for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) {
Env.StartNode(pos);
}

Env->Sim(TDuration::Seconds(300));
Cerr << "Start checking" << Endl;
for (ui32 step = 0; step < NumIters; ++step) {
Cerr << step << Endl;
Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size()));
UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]);
}
}
};



Y_UNIT_TEST_SUITE(VDiskBalancing) {

Y_UNIT_TEST(TestStopOneNode_Block42) {
TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(100)}.RunTest();
}
Y_UNIT_TEST(TestStopOneNode_Mirror3dc) {
TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(100)}.RunTest();
}
Y_UNIT_TEST(TestStopOneNode_Block42_HugeBlob) {
TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB)}.RunTest();
}
Y_UNIT_TEST(TestStopOneNode_Mirror3dc_HugeBlob) {
TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(521_KB)}.RunTest();
}

Y_UNIT_TEST(TestRandom_Block42) {
TRandomTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), 1000}.RunTest();
}
Y_UNIT_TEST(TestRandom_Mirror3dc) {
TRandomTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), 1000}.RunTest();
}

}
15 changes: 15 additions & 0 deletions ydb/core/blobstorage/ut_blobstorage/ut_balancing/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
UNITTEST_FOR(ydb/core/blobstorage/ut_blobstorage)

SIZE(MEDIUM)

TIMEOUT(600)

SRCS(
balancing.cpp
)

PEERDIR(
ydb/core/blobstorage/ut_blobstorage/lib
)

END()
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_blobstorage/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ REQUIREMENTS(ram:32)
END()

RECURSE_FOR_TESTS(
ut_balancing
ut_blob_depot
ut_blob_depot_fat
ut_donor
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/util/testactorsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class TTestActorSystem {
}

void StateFunc(TAutoPtr<IEventHandle>& ev) {
Y_ABORT_UNLESS(HandlePtr, "event is not being captured by this actor Tag# %s", Tag.data());
Y_ABORT_UNLESS(HandlePtr, "event %s is not being captured by this actor Tag# %s", ev->GetTypeName().data(), Tag.data());
Y_ABORT_UNLESS(!*HandlePtr);
HandlePtr->reset(ev.Release());
}
Expand Down

0 comments on commit 7116d46

Please sign in to comment.