Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix put impl class #2829

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 23 additions & 48 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ void TBlobState::AddPartToPut(ui32 partIdx, TRope&& partData) {
IsChanged = true;
}

void TBlobState::MarkBlobReadyToPut(ui8 blobIdx) {
Y_ABORT_UNLESS(WholeSituation == ESituation::Unknown || WholeSituation == ESituation::Present);
BlobIdx = blobIdx;
IsChanged = true;
}

bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
const TIntervalVec<i32> fullBlobInterval(0, Id.BlobSize());
const TIntervalSet<i32> here = Whole.Here();
Expand Down Expand Up @@ -227,7 +221,7 @@ TString TBlobState::ToString() const {
for (ui32 i = 0; i < Disks.size(); ++i) {
str << Endl << " Disks[" << i << "]# " << Disks[i].ToString() << Endl;
}
str << " BlobIdx# " << (ui32)BlobIdx << Endl;
str << " BlobIdx# " << BlobIdx << Endl;
str << "}";
return str.Str();
}
Expand Down Expand Up @@ -304,7 +298,7 @@ void TGroupDiskRequests::AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui3
}

void TGroupDiskRequests::AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx) {
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx) {
PutsPending.emplace_back(diskOrderNumber, id, buffer, putReason, isHandoff, blobIdx);
}

Expand Down Expand Up @@ -340,20 +334,6 @@ void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& part
(*this)[id].AddPartToPut(partIdx, std::move(partData));
}

void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() == 0);
Y_ABORT_UNLESS(id.BlobSize() != 0);
(*this)[id].MarkBlobReadyToPut(blobIdx);
}

void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() == 0);
Y_ABORT_UNLESS(id.BlobSize() != 0);
DoneBlobStates.insert(BlobStates.extract(id));
}

void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
Expand Down Expand Up @@ -390,8 +370,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
}

EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
TString errorReason;
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
auto& blob = it->second;
if (!std::exchange(blob.IsChanged, false)) {
Expand All @@ -401,23 +380,19 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec

// recalculate blob outcome if it is not yet determined
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
TString errorReason;
for (IStrategy *strategy : s) {
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
case EStrategyOutcome::IN_PROGRESS:
status = NKikimrProto::UNKNOWN;
break;

case EStrategyOutcome::ERROR:
if (IsAllRequestsTogether) {
if (!finished) {
return res;
}
if (errorReason) {
errorReason += " && ";
errorReason += res.ErrorReason;
} else {
errorReason = res.ErrorReason;
}
status = NKikimrProto::ERROR;
errorReason = std::move(res.ErrorReason);
break;

case EStrategyOutcome::DONE:
Expand All @@ -431,26 +406,25 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
status = NKikimrProto::UNKNOWN;
}
if (status != NKikimrProto::UNKNOWN) {
if (finished) { // we are operating on independent blobs
finished->push_back(TFinishedBlob{
blob.BlobIdx,
status,
std::move(errorReason),
});
}
const auto [doneIt, inserted, node] = DoneBlobStates.insert(BlobStates.extract(it++));
Y_ABORT_UNLESS(inserted);
if (!IsAllRequestsTogether) {
blob.Status = status;
if (finished) {
finished->push_back(&*doneIt);
}
}
} else {
++it;
}
}

EStrategyOutcome outcome(BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS);
outcome.ErrorReason = std::move(errorReason);
return outcome;
return BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS;
}

EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
TBatchedVec<TBlobStates::value_type*> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
}

Expand All @@ -464,8 +438,7 @@ TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
<< " blobId# " << fullId
<< " BlackBoard# " << ToString());
}
TBlobState &state = it->second;
return state;
return it->second;
}

ssize_t TBlackboard::AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex) {
Expand Down Expand Up @@ -512,8 +485,12 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T
}
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id) {
(*this)[id];
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
const auto [it, inserted] = BlobStates.try_emplace(id);
Y_ABORT_UNLESS(inserted);
TBlobState& state = it->second;
state.Init(id, *Info);
state.BlobIdx = blobIdx;
}

TBlobState& TBlackboard::operator [](const TLogoBlobID& id) {
Expand Down Expand Up @@ -559,9 +536,7 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
const TVDiskID vdiskId = Info->GetVDiskId(orderNumber);
for (auto& [id, state] : BlobStates) {
if (const ui32 diskIdx = Info->GetIdxInSubgroup(vdiskId, id.Hash()); diskIdx != Info->Type.BlobSubgroupSize()) {
TBlobState::TDisk& disk = state.Disks[diskIdx];
for (ui32 partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart& part = disk.DiskParts[partIdx];
for (TBlobState::TDiskPart& part : state.Disks[diskIdx].DiskParts) {
if (part.Situation == TBlobState::ESituation::Present) {
part.Situation = TBlobState::ESituation::Unknown;
if (state.WholeSituation == TBlobState::ESituation::Present) {
Expand Down
30 changes: 15 additions & 15 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,12 @@ struct TBlobState {
TStackVec<TState, TypicalPartsInBlob> Parts;
TStackVec<TDisk, TypicalDisksInSubring> Disks;
TVector<TEvBlobStorage::TEvGetResult::TPartMapItem> PartMap;
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
ui8 BlobIdx;
size_t BlobIdx;
bool IsChanged = false;

void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
void AddPartToPut(ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
ui32 shift, TRope&& data);
Expand Down Expand Up @@ -133,9 +131,9 @@ struct TDiskPutRequest {
TRope Buffer;
EPutReason Reason;
bool IsHandoff;
ui8 BlobIdx;
size_t BlobIdx;

TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, ui8 blobIdx)
TDiskPutRequest(ui32 orderNumber, const TLogoBlobID &id, TRope buffer, EPutReason reason, bool isHandoff, size_t blobIdx)
: OrderNumber(orderNumber)
, Id(id)
, Buffer(std::move(buffer))
Expand All @@ -152,7 +150,7 @@ struct TGroupDiskRequests {
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, const TIntervalSet<i32> &intervalSet);
void AddGet(ui32 diskOrderNumber, const TLogoBlobID &id, ui32 shift, ui32 size);
void AddPut(ui32 diskOrderNumber, const TLogoBlobID &id, TRope buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, ui8 blobIdx);
TDiskPutRequest::EPutReason putReason, bool isHandoff, size_t blobIdx);
};

struct TBlackboard;
Expand All @@ -170,6 +168,12 @@ struct TBlackboard {
AccelerationModeSkipMarked
};

struct TFinishedBlob {
size_t BlobIdx;
NKikimrProto::EReplyStatus Status;
TString ErrorReason;
};

using TBlobStates = TMap<TLogoBlobID, TBlobState>;
TBlobStates BlobStates;
TBlobStates DoneBlobStates;
Expand All @@ -179,31 +183,27 @@ struct TBlackboard {
EAccelerationMode AccelerationMode;
const NKikimrBlobStorage::EPutHandleClass PutHandleClass;
const NKikimrBlobStorage::EGetHandleClass GetHandleClass;
const bool IsAllRequestsTogether;

TBlackboard(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass,
bool isAllRequestsTogether = true)
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass)
: Info(info)
, GroupQueues(groupQueues)
, AccelerationMode(AccelerationModeSkipOneSlowest)
, PutHandleClass(putHandleClass)
, GetHandleClass(getHandleClass)
, IsAllRequestsTogether(isAllRequestsTogether)
{}

void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);

EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
TBatchedVec<TFinishedBlob> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TFinishedBlob> *finished = nullptr,
const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
TBlobState& GetState(const TLogoBlobID &id);
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
Expand All @@ -221,7 +221,7 @@ struct TBlackboard {

void InvalidatePartStates(ui32 orderNumber);

void RegisterBlobForPut(const TLogoBlobID& id);
void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx);

TBlobState& operator [](const TLogoBlobID& id);
};
Expand Down
35 changes: 15 additions & 20 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
SanityCheck(); // May Die
}

bool Action() {
bool Action(bool accelerate = false) {
UpdateExpiredVDiskSet();

TPutImpl::TPutResultVec putResults;
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet);
PutImpl.Step(LogCtx, putResults, ExpiredVDiskSet, accelerate);
if (ReplyAndDieWithLastResponse(putResults)) {
return true;
}
Expand All @@ -133,9 +133,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
return;
}
IsAccelerated = true;

PutImpl.Accelerate(LogCtx);
Action();
Action(true);
// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
}

Expand Down Expand Up @@ -210,9 +208,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
HandleIncarnation(issue, orderNumber, record.GetIncarnationGuid());
}

if (Action()) {
return;
}
Action();
}

void Handle(TEvBlobStorage::TEvVPutResult::TPtr &ev) {
Expand Down Expand Up @@ -265,7 +261,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) {
TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId;
TPutImpl::TPutResultVec putResults;
PutImpl.PrepareOneReply(status, blobId.FullID(), blobIdx, LogCtx, std::move(error), putResults);
PutImpl.PrepareOneReply(status, blobIdx, LogCtx, std::move(error), putResults);
ReplyAndDieWithLastResponse(putResults);
} else {
PutImpl.ProcessResponse(*ev->Get());
Expand Down Expand Up @@ -351,7 +347,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
Y_ABORT_UNLESS(itemStatus != NKikimrProto::RACE); // we should get RACE for the whole request and handle it in CheckForTermErrors
if (itemStatus == NKikimrProto::BLOCKED || itemStatus == NKikimrProto::DEADLINE) {
ErrorReason = TStringBuilder() << "Got VMultiPutResult itemStatus# " << itemStatus << " from VDiskId# " << vdiskId;
PutImpl.PrepareOneReply(itemStatus, blobId.FullID(), blobIdx, LogCtx, ErrorReason, putResults);
PutImpl.PrepareOneReply(itemStatus, blobIdx, LogCtx, ErrorReason, putResults);
}
}
if (ReplyAndDieWithLastResponse(putResults)) {
Expand Down Expand Up @@ -405,7 +401,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
return false;
}

void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, ui64 blobIdx) {
void SendReply(std::unique_ptr<TEvBlobStorage::TEvPutResult> putResult, size_t blobIdx) {
NKikimrProto::EReplyStatus status = putResult->Status;
A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPP21",
"SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent
Expand Down Expand Up @@ -449,7 +445,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
TString BlobIdSequenceToString() const {
TStringBuilder blobIdsStr;
blobIdsStr << '[';
for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (blobIdx) {
blobIdsStr << ' ';
}
Expand Down Expand Up @@ -603,7 +599,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt

StartTime = TActivationContext::Monotonic();

for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}

Expand Down Expand Up @@ -703,12 +699,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
s << ' ';
}
s << i;
auto& record = IncarnationRecords[i];
s << '{';
s << "IncarnationGuid# " << record.IncarnationGuid;
s << " ExpirationTimestamp# " << record.ExpirationTimestamp;
s << " StatusIssueTimestamp# " << record.StatusIssueTimestamp;
s << '}';
auto& r = IncarnationRecords[i];
s << '{' << r.IncarnationGuid
<< ' ' << (r.ExpirationTimestamp != TMonotonic::Max() ? TStringBuilder() << r.ExpirationTimestamp : "-"_sb)
<< ' ' << (r.StatusIssueTimestamp != TMonotonic::Zero() ? TStringBuilder() << r.StatusIssueTimestamp : "-"_sb)
<< '}';
}
s << '}';
return s.Str();
Expand All @@ -735,7 +730,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

STATEFN(StateWait) {
if (ProcessEvent(ev, IsManyPuts)) {
if (ProcessEvent(ev, true)) {
return;
}
const ui32 type = ev->GetTypeRewrite();
Expand Down
Loading
Loading