From aeea2b3f6cbc932642912331f1bf427ec4a202c2 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sat, 21 Dec 2024 16:48:33 +0300 Subject: [PATCH 1/5] accessors fetching control --- ydb/core/tx/columnshard/columnshard_impl.cpp | 4 +- .../data_accessor/abstract/collector.cpp | 9 +- .../data_accessor/abstract/collector.h | 20 +++- .../tx/columnshard/data_accessor/events.h | 5 +- .../data_accessor/in_mem/collector.cpp | 14 ++- .../data_accessor/in_mem/collector.h | 5 +- .../data_accessor/local_db/collector.cpp | 22 ++-- .../data_accessor/local_db/collector.h | 3 +- .../tx/columnshard/data_accessor/manager.cpp | 104 ++++++++++++++++++ .../tx/columnshard/data_accessor/manager.h | 66 ++--------- .../tx/columnshard/data_accessor/request.h | 2 +- 11 files changed, 168 insertions(+), 86 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 65e179c8c391..85dbbf14076c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -1422,13 +1422,13 @@ class TTxAskPortionChunks: public TTransactionBase { public: TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr& fetchCallback, - THashMap&& portions, const TString& consumer) + std::vector&& portions, const TString& consumer) : TBase(self) , FetchCallback(fetchCallback) , Consumer(consumer) { for (auto&& i : portions) { - PortionsByPath[i.second->GetPathId()].emplace_back(i.second); + PortionsByPath[i->GetPathId()].emplace_back(i); } } diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp index 97400ff1865e..18b138b607d7 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp @@ -5,10 +5,15 @@ namespace NKikimr::NOlap::NDataAccessorControl { -THashMap IGranuleDataAccessor::AskData( +void IGranuleDataAccessor::AskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { AFL_VERIFY(portions.size()); - return DoAskData(portions, callback, consumer); + DoAskData(portions, callback, consumer); +} + +TDataCategorized IGranuleDataAccessor::AnalyzeData( + const std::vector& portions, const TString& consumer) { + return DoAnalyzeData(portions, consumer); } void TActorAccessorsCallback::OnAccessorsFetched(std::vector&& accessors) { diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h index f34f867e7a0d..1d0bdf041520 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h @@ -20,12 +20,27 @@ class TActorAccessorsCallback: public IAccessorCallback { } }; +class TDataCategorized { +private: + YDB_READONLY_DEF(std::vector, PortionsToAsk); + YDB_READONLY_DEF(std::vector, CachedAccessors); + +public: + void AddToAsk(const TPortionInfo::TConstPtr& p) { + PortionsToAsk.emplace_back(p); + } + void AddFromCache(const TPortionDataAccessor& accessor) { + CachedAccessors.emplace_back(accessor); + } +}; + class IGranuleDataAccessor { private: const ui64 PathId; - virtual THashMap DoAskData( + virtual void DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) = 0; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) = 0; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) = 0; public: @@ -39,8 +54,9 @@ class IGranuleDataAccessor { : PathId(pathId) { } - THashMap AskData( + void AskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer); + TDataCategorized AnalyzeData(const std::vector& portions, const TString& consumer); void ModifyPortions(const std::vector& add, const std::vector& remove) { return DoModifyPortions(add, remove); } diff --git a/ydb/core/tx/columnshard/data_accessor/events.h b/ydb/core/tx/columnshard/data_accessor/events.h index 5f9c48ee9332..d5fb45aa42be 100644 --- a/ydb/core/tx/columnshard/data_accessor/events.h +++ b/ydb/core/tx/columnshard/data_accessor/events.h @@ -77,13 +77,12 @@ class TEvUnregisterController class TEvAskTabletDataAccessors: public NActors::TEventLocal { private: - using TPortions = THashMap; - YDB_ACCESSOR_DEF(TPortions, Portions); + YDB_ACCESSOR_DEF(std::vector, Portions); YDB_READONLY_DEF(std::shared_ptr, Callback); YDB_READONLY_DEF(TString, Consumer); public: - explicit TEvAskTabletDataAccessors(const THashMap& portions, + explicit TEvAskTabletDataAccessors(const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) : Portions(portions) , Callback(callback) diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp index 42a5558dc17a..a8a1003a045b 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp @@ -2,15 +2,19 @@ namespace NKikimr::NOlap::NDataAccessorControl::NInMem { -THashMap TCollector::DoAskData( +void TCollector::DoAskData( const std::vector& portions, const std::shared_ptr& /*callback*/, const TString& /*consumer*/) { - THashMap accessors; + AFL_VERIFY(portions.empty()); +} + +TDataCategorized TCollector::DoAnalyzeData(const std::vector& portions, const TString& /*consumer*/) { + TDataCategorized result; for (auto&& i : portions) { auto it = Accessors.find(i->GetPortionId()); AFL_VERIFY(it != Accessors.end()); - accessors.emplace(i->GetPortionId(), it->second); + result.AddFromCache(it->second); } - return accessors; + return result; } void TCollector::DoModifyPortions(const std::vector& add, const std::vector& remove) { @@ -22,4 +26,4 @@ void TCollector::DoModifyPortions(const std::vector& add, } } -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataAccessorControl::NInMem diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h index ead6b25ac23e..41ab570d7f25 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h @@ -6,8 +6,9 @@ class TCollector: public IGranuleDataAccessor { private: using TBase = IGranuleDataAccessor; THashMap Accessors; - virtual THashMap DoAskData( - const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) override; + virtual void DoAskData(const std::vector& portions, const std::shared_ptr& callback, + const TString& consumer) override; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp index 08be63308d6a..2c00d0b3cbb6 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp @@ -3,23 +3,25 @@ #include namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB { -THashMap TCollector::DoAskData( +void TCollector::DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { - THashMap accessors; - THashMap portionsToDirectAsk; + if (portions.size()) { + NActors::TActivationContext::Send( + TabletActorId, std::make_unique(portions, callback, consumer)); + } +} + +TDataCategorized TCollector::DoAnalyzeData(const std::vector& portions, const TString& consumer) { + TDataCategorized result; for (auto&& p : portions) { auto it = AccessorsCache.Find(p->GetPortionId()); if (it != AccessorsCache.End() && it.Key() == p->GetPortionId()) { - accessors.emplace(p->GetPortionId(), it.Value()); + result.AddFromCache(it.Value()); } else { - portionsToDirectAsk.emplace(p->GetPortionId(), p); + result.AddToAsk(p); } } - if (portionsToDirectAsk.size()) { - NActors::TActivationContext::Send( - TabletActorId, std::make_unique(portionsToDirectAsk, callback, consumer)); - } - return accessors; + return result; } void TCollector::DoModifyPortions(const std::vector& add, const std::vector& remove) { diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h index d52ca722ff49..c879224f97fe 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h @@ -17,8 +17,9 @@ class TCollector: public IGranuleDataAccessor { TLRUCache AccessorsCache; using TBase = IGranuleDataAccessor; - virtual THashMap DoAskData(const std::vector& portions, + virtual void DoAskData(const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) override; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; public: diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index f3a4cde0ec8e..b543c3d020ac 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -2,4 +2,108 @@ namespace NKikimr::NOlap::NDataAccessorControl { +void TLocalManager::DrainQueue() { + THashMap> portionsToAsk; + std::optional lastPathId; + IGranuleDataAccessor* lastDataAccessor = nullptr; + ui32 countToFlight = 0; + while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + auto p = PortionsAsk.front(); + if (!lastPathId || *lastPathId != p->GetPathId()) { + lastPathId = p->GetPathId(); + auto it = Managers.find(p->GetPathId()); + if (it == Managers.end()) { + lastDataAccessor = nullptr; + } else { + lastDataAccessor = it->second.get(); + } + } + PortionsAsk.pop_front(); + if (!lastDataAccessor) { + auto it = RequestsByPortion.find(p->GetPortionId()); + AFL_VERIFY(it != RequestsByPortion.end()); + for (auto&& i : it->second) { + if (!i->IsFetched()) { + i->AddError(p->GetPathId(), "path id absent"); + } + } + RequestsByPortion.erase(it); + } else { + portionsToAsk[p->GetPathId()].emplace_back(p); + ++countToFlight; + } + } + for (auto&& i : portionsToAsk) { + auto it = Managers.find(i.first); + AFL_VERIFY(it != Managers.end()); + auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE"); + for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) { + auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); + AFL_VERIFY(it != RequestsByPortion.end()); + for (auto&& i : it->second) { + if (!i->IsFetched()) { + i->AddAccessor(accessor); + } + } + RequestsByPortion.erase(it); + AFL_VERIFY(countToFlight); + --countToFlight; + } + if (dataAnalyzed.GetPortionsToAsk().size()) { + it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE"); + } + } + } + PortionsAskInFlight += countToFlight; +} + +void TLocalManager::DoAskData(const std::shared_ptr& request) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString()); + for (auto&& pathId : request->GetPathIds()) { + auto portions = request->StartFetching(pathId); + for (auto&& [_, i] : portions) { + auto itRequest = RequestsByPortion.find(i->GetPortionId()); + if (itRequest == RequestsByPortion.end()) { + AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector>({request})).second); + PortionsAsk.emplace_back(i); + } else { + itRequest->second.emplace_back(request); + } + } + } + DrainQueue(); +} + +void TLocalManager::DoRegisterController(std::unique_ptr&& controller, const bool update) { + if (update) { + auto it = Managers.find(controller->GetPathId()); + if (it != Managers.end()) { + it->second = std::move(controller); + } + } else { + AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second); + } +} + +void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { + { + auto it = Managers.find(accessor.GetPortionInfo().GetPathId()); + AFL_VERIFY(it != Managers.end()); + it->second->ModifyPortions({ accessor }, {}); + } + { + auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); + if (it != RequestsByPortion.end()) { + for (auto&& i : it->second) { + i->AddAccessor(accessor); + } + AFL_VERIFY(PortionsAskInFlight); + --PortionsAskInFlight; + } + RequestsByPortion.erase(it); + } + DrainQueue(); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index 83e9155cea03..f3b7d76de9a5 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -95,67 +95,17 @@ class TLocalManager: public IDataAccessorsManager { THashMap>> RequestsByPortion; const std::shared_ptr AccessorCallback; - virtual void DoAskData(const std::shared_ptr& request) override { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString()); - for (auto&& i : request->GetPathIds()) { - auto it = Managers.find(i); - if (it == Managers.end()) { - request->AddError(i, "incorrect path id"); - } else { - auto portions = request->StartFetching(i); - std::vector portionsAsk; - for (auto&& [_, i] : portions) { - auto itRequest = RequestsByPortion.find(i->GetPortionId()); - if (itRequest == RequestsByPortion.end()) { - portionsAsk.emplace_back(i); - } else { - itRequest->second.emplace_back(request); - } - } - if (portionsAsk.empty()) { - continue; - } - auto accessors = it->second->AskData(portionsAsk, AccessorCallback, request->GetConsumer()); - for (auto&& p : portionsAsk) { - auto itAccessor = accessors.find(p->GetPortionId()); - if (itAccessor == accessors.end()) { - AFL_VERIFY(RequestsByPortion.emplace(p->GetPortionId(), std::vector>({request})).second); - } else { - request->AddAccessor(itAccessor->second); - } - } - } - } - } - virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override { - if (update) { - auto it = Managers.find(controller->GetPathId()); - if (it != Managers.end()) { - it->second = std::move(controller); - } - } else { - AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second); - } - } + std::deque PortionsAsk; + ui64 PortionsAskInFlight = 0; + + void DrainQueue(); + + virtual void DoAskData(const std::shared_ptr& request) override; + virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override; virtual void DoUnregisterController(const ui64 pathId) override { AFL_VERIFY(Managers.erase(pathId)); } - virtual void DoAddPortion(const TPortionDataAccessor& accessor) override { - { - auto it = Managers.find(accessor.GetPortionInfo().GetPathId()); - AFL_VERIFY(it != Managers.end()); - it->second->ModifyPortions({ accessor }, {}); - } - { - auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); - if (it != RequestsByPortion.end()) { - for (auto&& i : it->second) { - i->AddAccessor(accessor); - } - } - RequestsByPortion.erase(it); - } - } + virtual void DoAddPortion(const TPortionDataAccessor& accessor) override; virtual void DoRemovePortion(const TPortionInfo::TConstPtr& portionInfo) override { auto it = Managers.find(portionInfo->GetPathId()); AFL_VERIFY(it != Managers.end()); diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 31ab85cd6578..8a9f20b68879 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -50,7 +50,7 @@ class TDataAccessorsResult: private NNonCopyable::TMoveOnly { } void AddError(const ui64 pathId, const TString& errorMessage) { - AFL_VERIFY(ErrorsByPathId.emplace(pathId, errorMessage).second); + ErrorsByPathId.emplace(pathId, errorMessage); } bool HasErrors() const { From b3ee0da9a274e7488ee581ce7ab0b68a75a86bf6 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 22 Dec 2024 11:55:47 +0300 Subject: [PATCH 2/5] running control --- .../tx/columnshard/data_accessor/manager.cpp | 10 +++++++--- .../tx/columnshard/data_accessor/manager.h | 18 +++++++++++++++++- .../tx/columnshard/data_accessor/request.h | 12 ++++++++++++ .../engines/reader/abstract/read_context.h | 17 +++++++++++++++-- .../reader/common_reader/iterator/context.h | 7 +++---- .../reader/plain_reader/iterator/source.cpp | 3 +++ .../reader/simple_reader/iterator/source.cpp | 5 ++++- .../engines/reader/sys_view/chunks/chunks.h | 3 +++ 8 files changed, 64 insertions(+), 11 deletions(-) diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index b543c3d020ac..028531f0d717 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -9,7 +9,12 @@ void TLocalManager::DrainQueue() { ui32 countToFlight = 0; while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { - auto p = PortionsAsk.front(); + if (PortionsAsk.front().GetActivityFlag() && !PortionsAsk.front().GetActivityFlag()->Val()) { + PortionsAsk.pop_front(); + continue; + } + auto p = PortionsAsk.front().ExtractPortion(); + PortionsAsk.pop_front(); if (!lastPathId || *lastPathId != p->GetPathId()) { lastPathId = p->GetPathId(); auto it = Managers.find(p->GetPathId()); @@ -19,7 +24,6 @@ void TLocalManager::DrainQueue() { lastDataAccessor = it->second.get(); } } - PortionsAsk.pop_front(); if (!lastDataAccessor) { auto it = RequestsByPortion.find(p->GetPortionId()); AFL_VERIFY(it != RequestsByPortion.end()); @@ -66,7 +70,7 @@ void TLocalManager::DoAskData(const std::shared_ptr& requ auto itRequest = RequestsByPortion.find(i->GetPortionId()); if (itRequest == RequestsByPortion.end()) { AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector>({request})).second); - PortionsAsk.emplace_back(i); + PortionsAsk.emplace_back(i, request->GetActivityFlag()); } else { itRequest->second.emplace_back(request); } diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index f3b7d76de9a5..f6f23bc73d11 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -95,7 +95,23 @@ class TLocalManager: public IDataAccessorsManager { THashMap>> RequestsByPortion; const std::shared_ptr AccessorCallback; - std::deque PortionsAsk; + class TPortionToAsk { + private: + TPortionInfo::TConstPtr Portion; + YDB_READONLY_DEF(std::shared_ptr, ActivityFlag); + + public: + TPortionToAsk(const TPortionInfo::TConstPtr& portion, const std::shared_ptr& activityFlag) + : Portion(portion) + , ActivityFlag(activityFlag) { + } + + TPortionInfo::TConstPtr ExtractPortion() { + return std::move(Portion); + } + }; + + std::deque PortionsAsk; ui64 PortionsAskInFlight = 0; void DrainQueue(); diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 8a9f20b68879..09e4d3a329ff 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -63,6 +63,7 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo THashSet RequestIds; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) = 0; + virtual const std::shared_ptr& DoGetActivityFlag() const = 0; void OnRequestsFinished(TDataAccessorsResult&& result) { DoOnRequestsFinished(std::move(result)); @@ -85,12 +86,18 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo OnRequestsFinished(std::move(*Result)); } } + const std::shared_ptr& GetActivityFlag() const { + return DoGetActivityFlag(); + } virtual ~IDataAccessorRequestsSubscriber() = default; }; class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber { private: + virtual const std::shared_ptr& DoGetActivityFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override { } }; @@ -225,6 +232,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter& GetActivityFlag() const { + AFL_VERIFY(HasSubscriber()); + return Subscriber->GetActivityFlag(); + } + bool HasSubscriber() const { return !!Subscriber; } diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 6b4666ceae7e..970e5b329a1c 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -53,7 +53,8 @@ class TReadContext { const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; - TAtomic AbortFlag = 0; + std::shared_ptr ActivityFlag = std::make_shared(1); + public: template std::shared_ptr GetReadMetadataPtrVerifiedAs() const { @@ -66,13 +67,25 @@ class TReadContext { return ReadMetadata->GetScanCursor(); } + const std::shared_ptr& GetActivityFlag() const { + return ActivityFlag; + } + void AbortWithError(const TString& errorMessage) { - if (AtomicCas(&AbortFlag, 1, 0)) { + if (AtomicCas(&*ActivityFlag, 0, 1)) { NActors::TActivationContext::Send( ScanActorId, std::make_unique(TConclusionStatus::Fail(errorMessage))); } } + void Stop() { + AtomicCas(ActivityFlag.get(), 0, 1); + } + + bool IsActive() const { + return AtomicGet(*ActivityFlag); + } + bool IsReverse() const { return ReadMetadata->IsDescSorted(); } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h index 9f2ae6d4bcba..7234bd299ee4 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -33,7 +33,6 @@ class TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); TReadMetadata::TConstPtr ReadMetadata; - TAtomic AbortFlag = 0; virtual std::shared_ptr DoGetColumnsFetchingPlan(const std::shared_ptr& source) = 0; @@ -67,12 +66,12 @@ class TSpecialReadContext { return ProcessMemoryGuard->GetProcessId(); } - bool IsAborted() const { - return AtomicGet(AbortFlag); + bool IsActive() const { + return CommonContext->IsActive(); } void Abort() { - AtomicSet(AbortFlag, 1); + CommonContext->Stop(); } virtual ~TSpecialReadContext() { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 9a9bd23c8571..347baa8b1624 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -232,6 +232,9 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; + virtual const std::shared_ptr& DoGetActivityFlag() const override { + return Source->GetContext()->GetCommonContext()->GetActivityFlag(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { AFL_VERIFY(!result.HasErrors()); AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size()); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index af65ff1a8fc6..e3e1da180af9 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -234,6 +234,10 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; + virtual const std::shared_ptr& DoGetActivityFlag() const override { + return Source->GetContext()->GetCommonContext()->GetActivityFlag(); + } + virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { AFL_VERIFY(!result.HasErrors()); AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size()); @@ -243,7 +247,6 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber auto task = std::make_shared(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } - public: TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr& source) : Step(step) diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 65afefa13fe1..4e5d1a431bd1 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -107,6 +107,9 @@ class TStatsIterator: public NAbstract::TStatsIterator Context; + virtual const std::shared_ptr& DoGetActivityFlag() const override { + return Context->GetActivityFlag(); + } virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& /*selfPtr*/) override { Guard = std::move(guard); From fadce02062437f293ab2d7cd5456abef60f10460 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 22 Dec 2024 13:06:23 +0300 Subject: [PATCH 3/5] fixes --- ydb/core/tx/columnshard/columnshard_impl.cpp | 3 +++ ydb/core/tx/columnshard/data_accessor/manager.cpp | 2 +- ydb/core/tx/columnshard/data_accessor/manager.h | 6 +++--- ydb/core/tx/columnshard/data_accessor/request.h | 14 +++++++------- .../engines/reader/abstract/read_context.h | 15 ++++++++------- .../reader/common_reader/iterator/context.h | 2 +- .../reader/plain_reader/iterator/source.cpp | 4 ++-- .../simple_reader/iterator/plain_read_data.h | 4 ++-- .../reader/simple_reader/iterator/scanner.cpp | 4 ++-- .../reader/simple_reader/iterator/source.cpp | 4 ++-- .../engines/reader/sys_view/chunks/chunks.h | 4 ++-- 11 files changed, 33 insertions(+), 29 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 85dbbf14076c..f75573475f76 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -619,6 +619,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { private: std::shared_ptr ResourcesGuard; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return nullptr; + } virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { AFL_VERIFY(ResourcesGuard); diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index 028531f0d717..f2c4330aa582 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -9,7 +9,7 @@ void TLocalManager::DrainQueue() { ui32 countToFlight = 0; while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { - if (PortionsAsk.front().GetActivityFlag() && !PortionsAsk.front().GetActivityFlag()->Val()) { + if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) { PortionsAsk.pop_front(); continue; } diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index f6f23bc73d11..d4bbefa60e4d 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -98,12 +98,12 @@ class TLocalManager: public IDataAccessorsManager { class TPortionToAsk { private: TPortionInfo::TConstPtr Portion; - YDB_READONLY_DEF(std::shared_ptr, ActivityFlag); + YDB_READONLY_DEF(std::shared_ptr, AbortionFlag); public: - TPortionToAsk(const TPortionInfo::TConstPtr& portion, const std::shared_ptr& activityFlag) + TPortionToAsk(const TPortionInfo::TConstPtr& portion, const std::shared_ptr& abortionFlag) : Portion(portion) - , ActivityFlag(activityFlag) { + , AbortionFlag(abortionFlag) { } TPortionInfo::TConstPtr ExtractPortion() { diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 09e4d3a329ff..2d5f29ad2040 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -63,7 +63,7 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo THashSet RequestIds; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) = 0; - virtual const std::shared_ptr& DoGetActivityFlag() const = 0; + virtual const std::shared_ptr& DoGetAbortionFlag() const = 0; void OnRequestsFinished(TDataAccessorsResult&& result) { DoOnRequestsFinished(std::move(result)); @@ -86,8 +86,8 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo OnRequestsFinished(std::move(*Result)); } } - const std::shared_ptr& GetActivityFlag() const { - return DoGetActivityFlag(); + const std::shared_ptr& GetAbortionFlag() const { + return DoGetAbortionFlag(); } virtual ~IDataAccessorRequestsSubscriber() = default; @@ -95,8 +95,8 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber { private: - virtual const std::shared_ptr& DoGetActivityFlag() const override { - return Default>(); + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); } virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override { } @@ -232,9 +232,9 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter& GetActivityFlag() const { + const std::shared_ptr& GetAbortionFlag() const { AFL_VERIFY(HasSubscriber()); - return Subscriber->GetActivityFlag(); + return Subscriber->GetAbortionFlag(); } bool HasSubscriber() const { diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 970e5b329a1c..c933f4cf703f 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -53,7 +53,7 @@ class TReadContext { const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; - std::shared_ptr ActivityFlag = std::make_shared(1); + std::shared_ptr AbortionFlag = std::make_shared(0); public: template @@ -67,23 +67,23 @@ class TReadContext { return ReadMetadata->GetScanCursor(); } - const std::shared_ptr& GetActivityFlag() const { - return ActivityFlag; + const std::shared_ptr& GetAbortionFlag() const { + return AbortionFlag; } void AbortWithError(const TString& errorMessage) { - if (AtomicCas(&*ActivityFlag, 0, 1)) { + if (AbortionFlag->Inc() == 1)) { NActors::TActivationContext::Send( ScanActorId, std::make_unique(TConclusionStatus::Fail(errorMessage))); } } void Stop() { - AtomicCas(ActivityFlag.get(), 0, 1); + AbortionFlag->Inc(); } bool IsActive() const { - return AtomicGet(*ActivityFlag); + return AbortionFlag->Val() == 0; } bool IsReverse() const { @@ -140,7 +140,8 @@ class TReadContext { , ScanActorId(scanActorId) , ResourceSubscribeActorId(resourceSubscribeActorId) , ReadCoordinatorActorId(readCoordinatorActorId) - , ComputeShardingPolicy(computeShardingPolicy) { + , ComputeShardingPolicy(computeShardingPolicy) + { Y_ABORT_UNLESS(ReadMetadata); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h index 7234bd299ee4..0c4fd5350b35 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -67,7 +67,7 @@ class TSpecialReadContext { } bool IsActive() const { - return CommonContext->IsActive(); + return !CommonContext->IsAborted(); } void Abort() { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 347baa8b1624..0e0e5e7695b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -232,8 +232,8 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; - virtual const std::shared_ptr& DoGetActivityFlag() const override { - return Source->GetContext()->GetCommonContext()->GetActivityFlag(); + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Source->GetContext()->GetCommonContext()->GetAbortionFlag(); } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { AFL_VERIFY(!result.HasErrors()); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h index 08ec74361360..adfe861d6319 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h @@ -61,7 +61,7 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin return *Scanner; } virtual void OnSentDataFromInterval(const ui32 sourceIdx) const override { - if (SpecialReadContext->IsAborted()) { + if (!SpecialReadContext->IsActive()) { return; } Scanner->ContinueSource(sourceIdx); @@ -71,7 +71,7 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin TPlainReadData(const std::shared_ptr& context); ~TPlainReadData() { - if (!SpecialReadContext->IsAborted()) { + if (SpecialReadContext->IsActive()) { Abort("unexpected on destructor"); } } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index c43890afe0b4..bc4e34df7f17 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -116,7 +116,7 @@ TScanHead::TScanHead(std::deque>&& sources, const s } TConclusion TScanHead::BuildNextInterval() { - if (Context->IsAborted()) { + if (!Context->IsActive()) { return false; } bool changed = false; @@ -139,7 +139,7 @@ bool TScanHead::IsReverse() const { } void TScanHead::Abort() { - AFL_VERIFY(Context->IsAborted()); + AFL_VERIFY(!Context->IsActive()); for (auto&& i : FetchingSources) { i->Abort(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index e3e1da180af9..4281309386e2 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -234,8 +234,8 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; - virtual const std::shared_ptr& DoGetActivityFlag() const override { - return Source->GetContext()->GetCommonContext()->GetActivityFlag(); + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Source->GetContext()->GetCommonContext()->GetAbortionFlag(); } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 4e5d1a431bd1..3e5975eec8a5 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -107,8 +107,8 @@ class TStatsIterator: public NAbstract::TStatsIterator Context; - virtual const std::shared_ptr& DoGetActivityFlag() const override { - return Context->GetActivityFlag(); + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Context->GetAbortionFlag(); } virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& /*selfPtr*/) override { From 425a1b16ba9787a9d4c637562e4f2a842085dfcb Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 22 Dec 2024 13:39:42 +0300 Subject: [PATCH 4/5] correction --- ydb/core/tx/columnshard/columnshard__statistics.cpp | 3 +++ ydb/core/tx/columnshard/columnshard_impl.cpp | 4 ++-- ydb/core/tx/columnshard/data_accessor/manager.cpp | 2 +- .../columnshard/engines/reader/abstract/read_context.h | 9 +++++++-- .../engines/reader/common_reader/iterator/context.h | 4 ++++ .../engines/reader/sys_view/chunks/chunks.cpp | 4 ++++ .../columnshard/engines/reader/sys_view/chunks/chunks.h | 4 +--- 7 files changed, 22 insertions(+), 8 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index 928570b22d22..c482b73a7268 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -133,6 +133,9 @@ class TColumnPortionsAccumulator { const std::shared_ptr Result; std::shared_ptr VersionedIndex; const std::set ColumnTagsRequested; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override { THashMap> sketchesByColumns; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f75573475f76..f68cc93dd473 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -619,8 +619,8 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { private: std::shared_ptr ResourcesGuard; - virtual const std::shared_ptr& DoGetAbortionFlag() const override { - return nullptr; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); } virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index f2c4330aa582..182f8d81fbed 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -70,7 +70,7 @@ void TLocalManager::DoAskData(const std::shared_ptr& requ auto itRequest = RequestsByPortion.find(i->GetPortionId()); if (itRequest == RequestsByPortion.end()) { AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector>({request})).second); - PortionsAsk.emplace_back(i, request->GetActivityFlag()); + PortionsAsk.emplace_back(i, request->GetAbortionFlag()); } else { itRequest->second.emplace_back(request); } diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index c933f4cf703f..f9cb5dac128a 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -54,6 +54,7 @@ class TReadContext { const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; std::shared_ptr AbortionFlag = std::make_shared(0); + std::shared_ptr ConstAbortionFlag = AbortionFlag; public: template @@ -68,11 +69,11 @@ class TReadContext { } const std::shared_ptr& GetAbortionFlag() const { - return AbortionFlag; + return ConstAbortionFlag; } void AbortWithError(const TString& errorMessage) { - if (AbortionFlag->Inc() == 1)) { + if (AbortionFlag->Inc() == 1) { NActors::TActivationContext::Send( ScanActorId, std::make_unique(TConclusionStatus::Fail(errorMessage))); } @@ -86,6 +87,10 @@ class TReadContext { return AbortionFlag->Val() == 0; } + bool IsAborted() const { + return AbortionFlag->Val(); + } + bool IsReverse() const { return ReadMetadata->IsDescSorted(); } diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h index 0c4fd5350b35..d0376d74d296 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -70,6 +70,10 @@ class TSpecialReadContext { return !CommonContext->IsAborted(); } + bool IsAborted() const { + return CommonContext->IsAborted(); + } + void Abort() { CommonContext->Stop(); } diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index fb238100d264..92fdf0689850 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -220,4 +220,8 @@ void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage); } +const std::shared_ptr& TStatsIterator::TFetchingAccessorAllocation::DoGetAbortionFlag() const { + return Context->GetAbortionFlag(); +} + } // namespace NKikimr::NOlap::NReader::NSysView::NChunks diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 3e5975eec8a5..c09a4f6d448b 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -107,9 +107,7 @@ class TStatsIterator: public NAbstract::TStatsIterator Context; - virtual const std::shared_ptr& DoGetAbortionFlag() const override { - return Context->GetAbortionFlag(); - } + virtual const std::shared_ptr& DoGetAbortionFlag() const override; virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& /*selfPtr*/) override { Guard = std::move(guard); From 100adf5f3b37293ad14c065a3c5b7a0a526f0968 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sun, 22 Dec 2024 14:43:32 +0300 Subject: [PATCH 5/5] fix --- ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 9635c24b3325..3f3a035f6817 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -358,6 +358,9 @@ class TTestCompactionAccessorsSubscriber: public NOlap::IDataAccessorRequestsSub private: std::shared_ptr Changes; const std::shared_ptr VersionedIndex; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { const TDataAccessorsInitializationContext context(VersionedIndex); @@ -438,6 +441,9 @@ class TTestMetadataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubsc std::shared_ptr Processor; TColumnEngineForLogs& Engine; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { Processor->ApplyResult( NOlap::NResourceBroker::NSubscribe::TResourceContainer::BuildForTest(std::move(result)), Engine);