Skip to content

Commit

Permalink
Fix scrubbing and flapping unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Feb 6, 2024
1 parent bc5b8c1 commit 11be8af
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 95 deletions.
4 changes: 4 additions & 0 deletions ydb/core/blobstorage/backpressure/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ class TBlobStorageQueue {
return Queues.InFlight.size();
}

ui64 GetInFlightCost() const {
return InFlightCost;
}

void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings,
const TBlobStorageGroupType& type);
void InvalidateCosts();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
<< " msgId# " << msgId << " sequenceId# " << sequenceId
<< " expectedMsgId# " << expectedMsgId << " expectedSequenceId# " << expectedSequenceId
<< " status# " << NKikimrProto::EReplyStatus_Name(status)
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws));
<< " ws# " << NKikimrBlobStorage::TWindowFeedback_EStatus_Name(ws)
<< " InFlightCost# " << Queue.GetInFlightCost()
<< " InFlightCount# " << Queue.InFlightCount()
<< " ItemsWaiting# " << Queue.GetItemsWaiting()
<< " BytesWaiting# " << Queue.GetBytesWaiting());

switch (ws) {
case NKikimrBlobStorage::TWindowFeedback::IncorrectMsgId:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,7 @@ namespace NKikimr {
}
}

TWindowStatus *Processed(bool checkMsgId, const TMessageId &msgId, ui64 cost, TWindowStatus *opStatus) {
Y_UNUSED(checkMsgId);
Y_UNUSED(msgId);
TWindowStatus *Processed(bool /*checkMsgId*/, const TMessageId& /*msgId*/, ui64 cost, TWindowStatus *opStatus) {
Y_ABORT_UNLESS(Cost >= cost);
Cost -= cost;
--InFlight;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl
msg->SetId(ReaderTabletData->Id);
msg->SetGeneration(ReaderTabletData->Generation);
}
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber
<< " vget# " << vget->ToString());
}

for (auto& vget : gets) {
if (vget) {
R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
<< Info->GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
<< " vget# " << vget->ToString());
outVGets.push_back(std::move(vget));
++RequestIndex;
}
Expand Down
115 changes: 73 additions & 42 deletions ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,64 +16,95 @@ void Test() {

TString data = TString::Uninitialized(8_MB);
memset(data.Detach(), 'X', data.size());
TLogoBlobID id(1, 1, 1, 0, data.size(), 0);

{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
for (ui32 step = 1; step < 100; ++step) {
TLogoBlobID id(1, 1, step, 0, data.size(), 0);

auto checkReadable = [&](NKikimrProto::EReplyStatus status) {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, status);
if (status == NKikimrProto::OK) {
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);
{ // write data to group
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max()));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
}
};

checkReadable(NKikimrProto::OK);
auto checkReadable = [&] {
TActorId sender = runtime->AllocateEdgeActor(1);
runtime->WrapInActorContext(sender, [&] {
SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead));
});
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1);
auto& r = res->Get()->Responses[0];
UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data);

ui32 partsMask = 0;
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TVDiskID& vdiskId = info->GetVDiskId(i);
env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) {
const TActorId sender = runtime->AllocateEdgeActor(1);
auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(),
NKikimrBlobStorage::EGetHandleClass::FastRead);
ev->AddExtremeQuery(id, 0, 0);
runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId());
auto reply = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender);
auto& record = reply->Get()->Record;
UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK);
UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1);
for (const auto& result : record.GetResult()) {
if (result.GetStatus() == NKikimrProto::OK) {
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID());
UNIT_ASSERT(id.PartId());
const ui32 partIdx = id.PartId() - 1;
const ui32 mask = 1 << partIdx;
UNIT_ASSERT(!(partsMask & mask));
partsMask |= mask;
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA);
}
}
});
}
UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1);
};

for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);
checkReadable();

ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());
for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) {
const TActorId vdiskActorId = info->GetActorId(i);

const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);
ui32 nodeId, pdiskId;
std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId);
auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId));
Y_ABORT_UNLESS(it != env.PDiskMockStates.end());

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) {
const TDiskPart& part = item.Location;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true);
break;
const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId());
env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvCaptureVDiskLayoutResult>(sender);

for (auto& item : res->Get()->Layout) {
using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult;
if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) {
const TDiskPart& part = item.Location;
it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true);
break;
}
}

checkReadable();
}

checkReadable(NKikimrProto::OK);
env.Sim(TDuration::Seconds(60));
}

env.Sim(TDuration::Seconds(60));
}

Y_UNIT_TEST_SUITE(ScrubFast) {
Y_UNIT_TEST(SingleBlob) {
Test();
}
}

4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ namespace NKikimr {

// a map to fill upon receiving VGet result
struct TPerBlobInfo {
const TInstant Deadline;
std::weak_ptr<TInFlightContext> Context;
TEvRecoverBlobResult::TItem *Item; // item to update
ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob
};
std::unordered_multimap<TLogoBlobID, TPerBlobInfo, THash<TLogoBlobID>> VGetResultMap;
std::set<std::tuple<TVDiskIdShort, TLogoBlobID>> GetsInFlight;

void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr<TInFlightContext>& context, TEvRecoverBlobResult::TItem *item);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize);
void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup);
void SendPendingQueries();
void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev);
NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item);
Expand Down
111 changes: 73 additions & 38 deletions ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,56 @@ namespace NKikimr {
STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS32, VDISKP(LogPrefix, "AddBlobQuery"), (SelfId, SelfId()),
(Id, id), (Needed, needed), (RequestId, context->RequestId));
const TInstant deadline = context->Iterator->first;
const TBlobStorageGroupType& gtype = Info->Type;
TBlobStorageGroupInfo::TOrderNums nums;
Info->GetTopology().PickSubgroup(id.Hash(), nums);
ui32 blobReplyCounter = 0;
for (ui32 i = 0; i < nums.size(); ++i) {
const TVDiskID& vdiskId = Info->GetVDiskId(i); // obtain VDisk
if (TVDiskIdShort(vdiskId) == VCtx->ShortSelfVDisk) {
continue;
if (TVDiskIdShort(vdiskId) != VCtx->ShortSelfVDisk) {
AddExtremeQuery(vdiskId, id, deadline, i);
++blobReplyCounter;
}
}
VGetResultMap.emplace(id, TPerBlobInfo{context, item, blobReplyCounter});
}

void TBlobRecoveryActor::AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup) {
const auto [_, inserted] = GetsInFlight.emplace(vdiskId, id);

ui32 worstReplySize = 0;
if (inserted) {
const TBlobStorageGroupType& gtype = Info->Type;
switch (TIngress::IngressMode(gtype)) {
case TIngress::EMode::GENERIC:
ui32 maxSize;
maxSize = 0;
if (gtype.GetErasure() == TBlobStorageGroupType::ErasureMirror3dc) {
maxSize += gtype.PartSize(TLogoBlobID(id, i % 3 + 1));
worstReplySize = gtype.PartSize(TLogoBlobID(id, idxInSubgroup % 3 + 1));
} else {
for (ui32 k = 0; k < gtype.TotalPartCount(); ++k) {
maxSize += i >= gtype.TotalPartCount() || k == i ? gtype.PartSize(TLogoBlobID(id, k + 1)) : 0;
worstReplySize += idxInSubgroup >= gtype.TotalPartCount() || k == idxInSubgroup
? gtype.PartSize(TLogoBlobID(id, k + 1)) : 0;
}
}
AddExtremeQuery(vdiskId, id, deadline, maxSize);
break;

case TIngress::EMode::MIRROR3OF4:
AddExtremeQuery(vdiskId, id, deadline, gtype.PartSize(TLogoBlobID(id, 1)) +
gtype.PartSize(TLogoBlobID(id, 2)));
for (ui32 i = 0; i < 2; ++i) {
if (idxInSubgroup % 2 == i || idxInSubgroup >= 4) {
worstReplySize += gtype.PartSize(TLogoBlobID(id, i + 1));
}
}
break;
}
++blobReplyCounter;
}
VGetResultMap.emplace(id, TPerBlobInfo{context->Iterator->first, context, item, blobReplyCounter});
}

void TBlobRecoveryActor::AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize) {
STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS33, VDISKP(LogPrefix, "AddExtremeQuery"), (SelfId, SelfId()),
(VDiskId, vdiskId), (Id, id), (WorstReplySize, worstReplySize));
(VDiskId, vdiskId), (Id, id), (WorstReplySize, worstReplySize), (AlreadyInFlight, !inserted));
if (!inserted) { // the request is already in flight
return;
}

TQuery& query = Queries[vdiskId];

const ui32 maxReplySize = 10000000; // FIXME
const ui32 maxReplySize = 32_MB;
if (query.VGet && query.WorstReplySize + worstReplySize > maxReplySize) { // send the request on overflow
query.Pending.push_back(std::move(query.VGet));
query.WorstReplySize = 0;
Expand Down Expand Up @@ -79,42 +89,67 @@ namespace NKikimr {
STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS35, VDISKP(LogPrefix, "received TEvVGetResult"), (SelfId, SelfId()),
(Msg, ev->Get()->ToString()));

const TInstant now = TActivationContext::Now();
const auto& record = ev->Get()->Record;
const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
std::unordered_map<TLogoBlobID, TInstant, THash<TLogoBlobID>> rerequest;
std::unordered_set<TLogoBlobID> done;

for (const auto& res : record.GetResult()) {
const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(res.GetBlobID());
const TLogoBlobID& fullId = id.FullID(); // whole blob id
auto r = VGetResultMap.equal_range(fullId);
for (auto it = r.first; it != r.second; ) {
done.insert(fullId);
const NKikimrProto::EReplyStatus status = res.GetStatus();
auto [begin, end] = VGetResultMap.equal_range(fullId);
for (auto it = begin; it != end; ) {
TPerBlobInfo& info = it->second;
if (auto context = info.Context.lock()) { // context acquired, request is still intact
auto& item = *info.Item; // only here we can access item, after obtaining context pointer
TRope data = ev->Get()->GetBlobData(res);
bool update = false;
if (res.GetStatus() == NKikimrProto::OK && data) {
item.SetPartData(id, std::move(data));
update = true;
}
const bool term = !--info.BlobReplyCounter;
if (item.Status == NKikimrProto::UNKNOWN && (term || update)) {
const NKikimrProto::EReplyStatus prevStatus = std::exchange(item.Status, ProcessItemData(item));
if (item.Status == NKikimrProto::UNKNOWN && term) { // not enough parts to fulfill request
item.Status = NKikimrProto::NODATA;
if (status == NKikimrProto::DEADLINE && now < context->Iterator->first) {
auto& deadline = rerequest[fullId];
deadline = Max(deadline, context->Iterator->first);
} else {
auto& item = *info.Item; // only here we can access item, after obtaining context pointer
TRope data = ev->Get()->GetBlobData(res);
bool update = false;
if (res.GetStatus() == NKikimrProto::OK && data) {
item.SetPartData(id, std::move(data));
update = true;
}
const bool term = !--info.BlobReplyCounter;
if (item.Status == NKikimrProto::UNKNOWN && (term || update)) {
const NKikimrProto::EReplyStatus prevStatus = std::exchange(item.Status, ProcessItemData(item));
if (item.Status == NKikimrProto::UNKNOWN && term) { // not enough parts to fulfill request
item.Status = NKikimrProto::NODATA;
}
STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS36, VDISKP(LogPrefix, "processing item"),
(SelfId, SelfId()), (RequestId, context->RequestId), (Id, id),
(Status, res.GetStatus()), (Last, term), (DataUpdated, update),
(EntryStatus, prevStatus), (ExitStatus, item.Status));
}
if (item.Status != NKikimrProto::UNKNOWN && !--context->NumUnrespondedBlobs) { // request fully completed
context->SendResult(SelfId());
InFlight.erase(context->Iterator);
}
if (term) { // this was the last reply for current blob
it = VGetResultMap.erase(it);
continue;
}
STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS36, VDISKP(LogPrefix, "processing item"),
(SelfId, SelfId()), (RequestId, context->RequestId), (Id, id),
(Status, res.GetStatus()), (Last, term), (DataUpdated, update),
(EntryStatus, prevStatus), (ExitStatus, item.Status));
}
if (item.Status != NKikimrProto::UNKNOWN && !--context->NumUnrespondedBlobs) {
context->SendResult(SelfId());
InFlight.erase(context->Iterator);
}
++it;
} else { // request deadlined or canceled, we erase it from the map
it = VGetResultMap.erase(it);
}
}
}

for (const auto& id : done) {
const size_t n = GetsInFlight.erase(std::make_tuple(vdiskId, id));
Y_DEBUG_ABORT_UNLESS(n == 1);
}
for (const auto& [id, deadline] : rerequest) {
AddExtremeQuery(vdiskId, id, deadline, Info->GetTopology().GetIdxInSubgroup(vdiskId, id.Hash()));
}
SendPendingQueries();
}

NKikimrProto::EReplyStatus TBlobRecoveryActor::ProcessItemData(TEvRecoverBlobResult::TItem& item) {
Expand Down
Loading

0 comments on commit 11be8af

Please sign in to comment.