Skip to content

Commit

Permalink
Add deadlines for each stage of patching (#5677)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall committed Jul 31, 2024
1 parent e2113ec commit d075cf1
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 43 deletions.
212 changes: 199 additions & 13 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp

Large diffs are not rendered by default.

91 changes: 71 additions & 20 deletions ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ enum class ENaivePatchCase {
ErrorOnPut,
};

#define CASE_TO_RETURN_STRING(cs) \
case cs: return #cs \
// end CASE_TO_RETURN_STRING
TString ToString(ENaivePatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(ENaivePatchCase::Ok);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGetItem);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnGet);
CASE_TO_RETURN_STRING(ENaivePatchCase::ErrorOnPut);
}
}

NKikimrProto::EReplyStatus GetPatchResultStatus(ENaivePatchCase naiveCase) {
switch (naiveCase) {
case ENaivePatchCase::Ok:
Expand Down Expand Up @@ -156,6 +168,17 @@ enum class EVPatchCase {
Custom,
};

TString ToString(EVPatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(EVPatchCase::Ok);
CASE_TO_RETURN_STRING(EVPatchCase::OneErrorAndAllPartExistInStart);
CASE_TO_RETURN_STRING(EVPatchCase::OnePartLostInStart);
CASE_TO_RETURN_STRING(EVPatchCase::DeadGroupInStart);
CASE_TO_RETURN_STRING(EVPatchCase::ErrorDuringVPatchDiff);
CASE_TO_RETURN_STRING(EVPatchCase::Custom);
}
}

NKikimrProto::EReplyStatus GetPatchResultStatus(EVPatchCase vpatchCase) {
switch (vpatchCase) {
case EVPatchCase::Ok:
Expand Down Expand Up @@ -249,6 +272,15 @@ enum class EMovedPatchCase {
Error
};

TString ToString(EMovedPatchCase cs) {
switch (cs) {
CASE_TO_RETURN_STRING(EMovedPatchCase::Ok);
CASE_TO_RETURN_STRING(EMovedPatchCase::Error);
}
}

#undef CASE_TO_RETURN_STRING

NKikimrProto::EReplyStatus GetPatchResultStatus(EMovedPatchCase movedCase) {
switch (movedCase) {
case EMovedPatchCase::Ok:
Expand Down Expand Up @@ -289,7 +321,7 @@ void ReceivePatchResult(TTestBasicRuntime &runtime, const TTestArgs &args, NKiki
}

void ConductGet(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
CTEST << "ConductGet: Start\n";
CTEST << "ConductGet: Start NaiveCase: " << ToString(naiveCase) << "\n";
NKikimrProto::EReplyStatus resultStatus = GetGetResultStatus(naiveCase);
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvGet *get = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvGet>(handle);
Expand Down Expand Up @@ -328,10 +360,10 @@ TString MakePatchedBuffer(const TTestArgs &args) {
void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
NKikimrProto::EReplyStatus resultStatus = GetPutResultStatus(naiveCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
CTEST << "ConductPut: Skip\n";
CTEST << "ConductPut: Skip NaiveCase: " << ToString(naiveCase) << "\n";
return;
}
CTEST << "ConductPut: Start\n";
CTEST << "ConductPut: Start NaiveCase: " << ToString(naiveCase) << "\n";
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvPut *put = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvPut>(handle);
UNIT_ASSERT_VALUES_EQUAL(put->Id, args.PatchedId);
Expand All @@ -346,22 +378,35 @@ void ConductPut(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCa
}

void ConductNaivePatch(TTestBasicRuntime &runtime, const TTestArgs &args, ENaivePatchCase naiveCase) {
CTEST << "ConductNaivePatch: Start\n";
CTEST << "ConductNaivePatch: Start NaiveCase: " << ToString(naiveCase) << Endl;
ConductGet(runtime, args, naiveCase);
ConductPut(runtime, args, naiveCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(naiveCase);
ReceivePatchResult(runtime, args, resultStatus);
CTEST << "ConductNaivePatch: Finish\n";
}

template <typename InnerType>
TString ToString(const TVector<InnerType> &lst) {
TStringBuilder bld;
bld << '[';
for (ui32 idx = 0; idx < lst.size(); ++idx) {
if (idx) {
bld << ", ";
}
bld << lst[idx];
}
bld << ']';
return bld;
}

void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.OriginalId.Hash());
CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchStart: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.OriginalId.Hash());
auto [status, parts] = GetVPatchFoundPartsStatus(env, args, naiveCase, vdiskPointer);
auto [status, parts] = GetVPatchFoundPartsStatus(env, args, vpatchCase, vdiskPointer);

auto start = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVPatchStart>({env.VDisks[vdiskIdx]});
auto &startRecord = start->Get()->Record;
Expand All @@ -376,21 +421,22 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons
for (auto partId : parts) {
foundParts->AddPart(partId);
}
CTEST << "ConductVPatchStart: Send FoundParts vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "parts# " << ToString(parts) << "\n";
SendByHandle(runtime, start, std::move(foundParts));
CTEST << "ConductVPatchStart: Finish vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
}

void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase naiveCase, TVDiskPointer vdiskPointer)
EVPatchCase vpatchCase, TVDiskPointer vdiskPointer)
{
auto [vdiskIdx, idxInSubgroup] = vdiskPointer.GetIndecies(env, args.PatchedId.Hash());
TVDiskID vdisk = env.Info->GetVDiskInSubgroup(idxInSubgroup, args.PatchedId.Hash());
NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, naiveCase, vdiskPointer);
NKikimrProto::EReplyStatus resultStatus = GetVPatchResultStatus(env, args, vpatchCase, vdiskPointer);
if (resultStatus == NKikimrProto::UNKNOWN) {
CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchDiff: Skip vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";
return;
}
CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << "\n";
CTEST << "ConductVPatchDiff: Start vdiskIdx# " << vdiskIdx << " idxInSubgroup# " << idxInSubgroup << " VPatchCase: " << ToString(vpatchCase) << "\n";

auto diffEv = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVPatchDiff>({env.VDisks[vdiskIdx]});
auto &diffRecord = diffEv->Get()->Record;
Expand All @@ -415,6 +461,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const
}

void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args) {
return; // disabled vpatch
CTEST << "ConductFailedVPatch: Start\n";
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
Expand All @@ -429,7 +476,7 @@ void ConductFailedVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con


void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMovedPatchCase movedCase) {
CTEST << "ConductVMovedPatch: Start\n";
CTEST << "ConductVMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
NKikimrProto::EReplyStatus resultStatus = GetVMovedPatchResultStatus(movedCase);
TAutoPtr<IEventHandle> handle;
TEvBlobStorage::TEvVMovedPatch *vPatch = runtime.GrabEdgeEventRethrow<TEvBlobStorage::TEvVMovedPatch>(handle);
Expand Down Expand Up @@ -459,7 +506,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove
void ConductMovedPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EMovedPatchCase movedCase)
{
CTEST << "ConductMovedPatch: Start\n";
CTEST << "ConductMovedPatch: Start MovedPatchCase: " << ToString(movedCase) << Endl;
ConductFailedVPatch(runtime, env, args);
ConductVMovedPatch(runtime, args, movedCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(movedCase);
Expand All @@ -481,7 +528,8 @@ void ConductFallbackPatch(TTestBasicRuntime &runtime, const TTestArgs &args) {
void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
CTEST << "ConductVPatchEvents: Start\n";
return; // disabled vpatch
CTEST << "ConductVPatchEvents: Start VPatchCase: " << ToString(vpatchCase) << Endl;
for (ui32 idxInSubgroup = 0; idxInSubgroup < args.GType.BlobSubgroupSize(); ++idxInSubgroup) {
TVDiskPointer vdisk = TVDiskPointer::GetVDiskIdx(idxInSubgroup);
ConductVPatchStart(runtime, env, args, vpatchCase, vdisk);
Expand All @@ -496,7 +544,7 @@ void ConductVPatchEvents(TTestBasicRuntime &runtime, const TDSProxyEnv &env, con
void ConductVPatch(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const TTestArgs &args,
EVPatchCase vpatchCase)
{
CTEST << "ConductFallbackPatch: Start\n";
CTEST << "ConductFallbackPatch: Start VPatchCase: " << ToString(vpatchCase) << Endl;
ConductVPatchEvents(runtime, env, args, vpatchCase);
NKikimrProto::EReplyStatus resultStatus = GetPatchResultStatus(vpatchCase);
if (resultStatus == NKikimrProto::UNKNOWN) {
Expand Down Expand Up @@ -620,17 +668,18 @@ void RunGeneralTest(void(*runner)(TTestBasicRuntime &runtime, const TTestArgs &a
Y_UNIT_TEST_NAIVE(ErrorOnPut, erasure) \
Y_UNIT_TEST_MOVED(Ok, erasure) \
Y_UNIT_TEST_MOVED(Error, erasure) \
Y_UNIT_TEST_VPATCH(Ok, erasure) \
Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure) \
Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure) \
Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure) \
Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure) \
Y_UNIT_TEST_SECURED(Ok, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGetItem, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnGet, erasure) \
Y_UNIT_TEST_SECURED(ErrorOnPut, erasure) \
// end Y_UNIT_TEST_PATCH_PACK

// Y_UNIT_TEST_VPATCH(Ok, erasure)
// Y_UNIT_TEST_VPATCH(OneErrorAndAllPartExistInStart, erasure)
// Y_UNIT_TEST_VPATCH(OnePartLostInStart, erasure)
// Y_UNIT_TEST_VPATCH(DeadGroupInStart, erasure)
// Y_UNIT_TEST_VPATCH(ErrorDuringVPatchDiff, erasure)

Y_UNIT_TEST_PATCH_PACK(ErasureNone)
Y_UNIT_TEST_PATCH_PACK(Erasure4Plus2Block)
Y_UNIT_TEST_PATCH_PACK(ErasureMirror3dc)
Expand Down Expand Up @@ -712,6 +761,7 @@ EFaultToleranceCase GetFaultToleranceCaseForBlock4Plus2(const TDSProxyEnv &env,
}
}
}
return EFaultToleranceCase::Fallback; // disabled vpatch
if (layout.CountEffectiveReplicas(env.Info->Type) == env.Info->Type.TotalPartCount()) {
return EFaultToleranceCase::Ok;
} else {
Expand All @@ -736,6 +786,7 @@ EFaultToleranceCase GetFaultToleranceCaseForMirror3dc(const TDSProxyEnv &env, co
for (ui32 dcIdx = 0; dcIdx < dcCnt; ++dcIdx) {
x2cnt += (replInDc[dcIdx] >= 2);
}
return EFaultToleranceCase::Fallback; // disabled vpatch
if ((replInDc[0] && replInDc[1] && replInDc[2]) || x2cnt >= 2) {
return EFaultToleranceCase::Ok;
} else {
Expand Down
78 changes: 77 additions & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -1585,7 +1585,7 @@ namespace NKikimr {
if (deadline != TInstant::Max()) {
this->Record.MutableMsgQoS()->SetDeadlineSeconds((ui32)deadline.Seconds());
}
this->Record.MutableMsgQoS()->SetExtQueueId(HandleClassToQueueId(NKikimrBlobStorage::AsyncBlob));
this->Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::PutAsyncBlob);
}

bool GetIgnoreBlock() const {
Expand Down Expand Up @@ -1965,6 +1965,25 @@ namespace NKikimr {
}
Record.MutableMsgQoS()->SetExtQueueId(NKikimrBlobStorage::EVDiskQueueId::GetFastRead);
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchStart &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
str << "{TEvVPatchStart";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvVPatchFoundParts
Expand Down Expand Up @@ -2010,6 +2029,25 @@ namespace NKikimr {
Record.SetStatus(status);
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchFoundParts &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedBlobId());
str << "{TEvVPatchFoundParts";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}

void MakeError(NKikimrProto::EReplyStatus status, const TString& errorReason,
const NKikimrBlobStorage::TEvVPatchStart &request) {
Record.SetErrorReason(errorReason);
Expand Down Expand Up @@ -2099,6 +2137,25 @@ namespace NKikimr {
}
return result;
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchDiff &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
str << "{TEvVPatchDiff";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};


Expand Down Expand Up @@ -2144,6 +2201,25 @@ namespace NKikimr {
}
return result;
}

TString ToString() const {
return ToString(this->Record);
}

static TString ToString(const NKikimrBlobStorage::TEvVPatchXorDiff &record) {
TStringStream str;
TLogoBlobID originalId = LogoBlobIDFromLogoBlobID(record.GetOriginalPartBlobId());
TLogoBlobID patchedId = LogoBlobIDFromLogoBlobID(record.GetPatchedPartBlobId());
str << "{TEvVPatchXorDiff";
str << " OriginalBlobId# " << originalId.ToString();
str << " PatchedBlobId# " << patchedId.ToString();
if (record.HasMsgQoS()) {
str << " ";
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << "}";
return str.Str();
}
};

struct TEvBlobStorage::TEvVPatchXorDiffResult
Expand Down
Loading

0 comments on commit d075cf1

Please sign in to comment.