Skip to content

Commit

Permalink
Untie Keep/DoNotKeep flags from blackboard KIKIMR-20527 (ydb-platform…
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored and adameat committed Dec 29, 2023
1 parent 8dcfdd8 commit adafae7
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 25 deletions.
19 changes: 6 additions & 13 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) {
}

void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
ui32 shift, TRope&& data) {
// Add actual data to Parts
Y_ABORT_UNLESS(id.PartId() != 0);
const ui32 partIdx = id.PartId() - 1;
Expand All @@ -112,9 +112,6 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB
TIntervalVec<i32> responseInterval(shift, shift + dataSize);
diskPart.Requested.Subtract(responseInterval);
}

Keep |= keep;
DoNotKeep |= doNotKeep;
}

void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
Expand Down Expand Up @@ -164,8 +161,7 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo
diskPart.Requested.Clear();
}

void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber,
bool keep, bool doNotKeep) {
void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(id.PartId() != 0);
const ui32 partIdx = id.PartId() - 1;
IsChanged = true;
Expand All @@ -179,9 +175,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
TDiskPart &diskPart = disk.DiskParts[partIdx];
diskPart.Situation = ESituation::Lost;
diskPart.Requested.Clear();

Keep |= keep;
DoNotKeep |= doNotKeep;
}

ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
Expand Down Expand Up @@ -377,11 +370,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddPutOkResponse(*Info, id, orderNumber);
}

void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep) {
void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
TBlobState &state = GetState(id);
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data), keep, doNotKeep);
state.AddResponseData(*Info, id, orderNumber, shift, std::move(data));
}

void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
Expand All @@ -391,11 +384,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) {
state.AddNoDataResponse(*Info, id, orderNumber);
}

void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) {
void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) {
Y_ABORT_UNLESS(bool(id));
Y_ABORT_UNLESS(id.PartId() != 0);
TBlobState &state = GetState(id);
state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep);
state.AddNotYetResponse(*Info, id, orderNumber);
}

void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,18 @@ struct TBlobState {
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
ui8 BlobIdx;
bool IsChanged = false;
bool Keep = false;
bool DoNotKeep = false;

void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
void AddPartToPut(ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(ui8 blobIdx = 0);
bool Restore(const TBlobStorageGroupInfo &info);
void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
ui32 shift, TRope&& data, bool keep, bool doNotKeep);
ui32 shift, TRope&& data);
void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring,
bool keep, bool doNotKeep);
void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring);
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
Expand Down Expand Up @@ -204,11 +201,11 @@ struct TBlackboard {
void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data);
void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep);
void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber);
EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec<IStrategy*, 1>& strategies,
TBatchedVec<TBlobStates::value_type*> *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr);
EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec<TBlobStates::value_type*> *finished = nullptr,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas
const TBlobState &blobState = Blackboard.GetState(query.Id);
outResponse.Id = query.Id;
outResponse.PartMap = blobState.PartMap;
outResponse.Keep = blobState.Keep;
outResponse.DoNotKeep = blobState.DoNotKeep;
outResponse.LooksLikePhantom = PhantomCheck
? std::make_optional(blobState.WholeSituation == TBlobState::ESituation::Absent)
: std::nullopt;

// fill in keep/doNotKeep flags
const auto it = BlobFlags.find(query.Id);
std::tie(outResponse.Keep, outResponse.DoNotKeep) = it != BlobFlags.end() ? it->second : std::make_tuple(false, false);

if (blobState.WholeSituation == TBlobState::ESituation::Absent) {
bool okay = true;

Expand Down
13 changes: 10 additions & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class TGetImpl {

std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;

std::unordered_map<TLogoBlobID, std::tuple<bool, bool>> BlobFlags; // keep, doNotKeep per blob

public:
TGetImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &groupQueues,
TEvBlobStorage::TEvGet *ev, TNodeLayoutInfoPtr&& nodeLayout, const TString& requestPrefix = {})
Expand Down Expand Up @@ -219,6 +221,12 @@ class TGetImpl {
}
}

if (result.HasKeep() || result.HasDoNotKeep()) {
auto& [a, b] = BlobFlags[blobId];
a |= result.GetKeep();
b |= result.GetDoNotKeep();
}

if (replyStatus == NKikimrProto::OK) {
// TODO(cthulhu): Verify shift and response size, and cookie
R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString());
Expand All @@ -228,8 +236,7 @@ class TGetImpl {
resultBuffer.ExtractFrontPlain(temp.GetDataMut(), temp.size());
resultBuffer.Insert(resultBuffer.End(), std::move(temp));
}
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer), result.GetKeep(),
result.GetDoNotKeep());
Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer));
} else if (replyStatus == NKikimrProto::NODATA) {
R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
Expand All @@ -243,7 +250,7 @@ class TGetImpl {
} else if (replyStatus == NKikimrProto::NOT_YET) {
R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber
<< " vDiskId# " << vdisk.ToString());
Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep());
Blackboard.AddNotYetResponse(blobId, orderNumber);
} else {
Y_ABORT_UNLESS(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data());
}
Expand Down

0 comments on commit adafae7

Please sign in to comment.