diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 09144e50e806..9b49630b4f8d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -87,7 +87,7 @@ bool TBlobState::Restore(const TBlobStorageGroupInfo &info) { } void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber, - ui32 shift, TRope&& data, bool keep, bool doNotKeep) { + ui32 shift, TRope&& data) { // Add actual data to Parts Y_ABORT_UNLESS(id.PartId() != 0); const ui32 partIdx = id.PartId() - 1; @@ -112,9 +112,6 @@ void TBlobState::AddResponseData(const TBlobStorageGroupInfo &info, const TLogoB TIntervalVec responseInterval(shift, shift + dataSize); diskPart.Requested.Subtract(responseInterval); } - - Keep |= keep; - DoNotKeep |= doNotKeep; } void TBlobState::AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { @@ -164,8 +161,7 @@ void TBlobState::AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogo diskPart.Requested.Clear(); } -void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber, - bool keep, bool doNotKeep) { +void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber) { Y_ABORT_UNLESS(id.PartId() != 0); const ui32 partIdx = id.PartId() - 1; IsChanged = true; @@ -179,9 +175,6 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog TDiskPart &diskPart = disk.DiskParts[partIdx]; diskPart.Situation = ESituation::Lost; diskPart.Requested.Clear(); - - Keep |= keep; - DoNotKeep |= doNotKeep; } ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, @@ -377,11 +370,11 @@ void TBlackboard::AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber) { state.AddPutOkResponse(*Info, id, orderNumber); } -void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep) { +void TBlackboard::AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data) { Y_ABORT_UNLESS(bool(id)); Y_ABORT_UNLESS(id.PartId() != 0); TBlobState &state = GetState(id); - state.AddResponseData(*Info, id, orderNumber, shift, std::move(data), keep, doNotKeep); + state.AddResponseData(*Info, id, orderNumber, shift, std::move(data)); } void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) { @@ -391,11 +384,11 @@ void TBlackboard::AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber) { state.AddNoDataResponse(*Info, id, orderNumber); } -void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep) { +void TBlackboard::AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber) { Y_ABORT_UNLESS(bool(id)); Y_ABORT_UNLESS(id.PartId() != 0); TBlobState &state = GetState(id); - state.AddNotYetResponse(*Info, id, orderNumber, keep, doNotKeep); + state.AddNotYetResponse(*Info, id, orderNumber); } void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 003ba6dc819d..41b18eaf0869 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -84,8 +84,6 @@ struct TBlobState { NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN; ui8 BlobIdx; bool IsChanged = false; - bool Keep = false; - bool DoNotKeep = false; void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); @@ -93,12 +91,11 @@ struct TBlobState { void MarkBlobReadyToPut(ui8 blobIdx = 0); bool Restore(const TBlobStorageGroupInfo &info); void AddResponseData(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, - ui32 shift, TRope&& data, bool keep, bool doNotKeep); + ui32 shift, TRope&& data); void AddPutOkResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 orderNumber); void AddNoDataResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); void AddErrorResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); - void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring, - bool keep, bool doNotKeep); + void AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLogoBlobID &id, ui32 diskIdxInSubring); ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const; void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, @@ -204,11 +201,11 @@ struct TBlackboard { void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TRope&& partData); void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); void MoveBlobStateToDone(const TLogoBlobID &id); - void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data, bool keep, bool doNotKeep); + void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TRope&& data); void AddPutOkResponse(const TLogoBlobID &id, ui32 orderNumber); void AddNoDataResponse(const TLogoBlobID &id, ui32 orderNumber); void AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber); - void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber, bool keep, bool doNotKeep); + void AddNotYetResponse(const TLogoBlobID &id, ui32 orderNumber); EStrategyOutcome RunStrategies(TLogContext& logCtx, const TStackVec& strategies, TBatchedVec *finished = nullptr, const TBlobStorageGroupInfo::TGroupVDisks *expired = nullptr); EStrategyOutcome RunStrategy(TLogContext &logCtx, const IStrategy& s, TBatchedVec *finished = nullptr, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 067090088c38..3c9538fc244d 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -44,12 +44,14 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas const TBlobState &blobState = Blackboard.GetState(query.Id); outResponse.Id = query.Id; outResponse.PartMap = blobState.PartMap; - outResponse.Keep = blobState.Keep; - outResponse.DoNotKeep = blobState.DoNotKeep; outResponse.LooksLikePhantom = PhantomCheck ? std::make_optional(blobState.WholeSituation == TBlobState::ESituation::Absent) : std::nullopt; + // fill in keep/doNotKeep flags + const auto it = BlobFlags.find(query.Id); + std::tie(outResponse.Keep, outResponse.DoNotKeep) = it != BlobFlags.end() ? it->second : std::make_tuple(false, false); + if (blobState.WholeSituation == TBlobState::ESituation::Absent) { bool okay = true; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index eefa4eeba50c..24ae81b8476c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -53,6 +53,8 @@ class TGetImpl { std::optional ReaderTabletData; + std::unordered_map> BlobFlags; // keep, doNotKeep per blob + public: TGetImpl(const TIntrusivePtr &info, const TIntrusivePtr &groupQueues, TEvBlobStorage::TEvGet *ev, TNodeLayoutInfoPtr&& nodeLayout, const TString& requestPrefix = {}) @@ -219,6 +221,12 @@ class TGetImpl { } } + if (result.HasKeep() || result.HasDoNotKeep()) { + auto& [a, b] = BlobFlags[blobId]; + a |= result.GetKeep(); + b |= result.GetDoNotKeep(); + } + if (replyStatus == NKikimrProto::OK) { // TODO(cthulhu): Verify shift and response size, and cookie R_LOG_DEBUG_SX(logCtx, "BPG58", "Got# OK orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); @@ -228,8 +236,7 @@ class TGetImpl { resultBuffer.ExtractFrontPlain(temp.GetDataMut(), temp.size()); resultBuffer.Insert(resultBuffer.End(), std::move(temp)); } - Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer), result.GetKeep(), - result.GetDoNotKeep()); + Blackboard.AddResponseData(blobId, orderNumber, resultShift, std::move(resultBuffer)); } else if (replyStatus == NKikimrProto::NODATA) { R_LOG_DEBUG_SX(logCtx, "BPG59", "Got# NODATA orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); @@ -243,7 +250,7 @@ class TGetImpl { } else if (replyStatus == NKikimrProto::NOT_YET) { R_LOG_DEBUG_SX(logCtx, "BPG67", "Got# NOT_YET orderNumber# " << orderNumber << " vDiskId# " << vdisk.ToString()); - Blackboard.AddNotYetResponse(blobId, orderNumber, result.GetKeep(), result.GetDoNotKeep()); + Blackboard.AddNotYetResponse(blobId, orderNumber); } else { Y_ABORT_UNLESS(false, "Unexpected reply status# %s", NKikimrProto::EReplyStatus_Name(replyStatus).data()); }