diff --git a/ydb/core/tx/columnshard/columnshard__statistics.cpp b/ydb/core/tx/columnshard/columnshard__statistics.cpp index 928570b22d22..c482b73a7268 100644 --- a/ydb/core/tx/columnshard/columnshard__statistics.cpp +++ b/ydb/core/tx/columnshard/columnshard__statistics.cpp @@ -133,6 +133,9 @@ class TColumnPortionsAccumulator { const std::shared_ptr Result; std::shared_ptr VersionedIndex; const std::set ColumnTagsRequested; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override { THashMap> sketchesByColumns; diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 65e179c8c391..f68cc93dd473 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -619,6 +619,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask { class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber { private: std::shared_ptr ResourcesGuard; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final { AFL_VERIFY(ResourcesGuard); @@ -1422,13 +1425,13 @@ class TTxAskPortionChunks: public TTransactionBase { public: TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr& fetchCallback, - THashMap&& portions, const TString& consumer) + std::vector&& portions, const TString& consumer) : TBase(self) , FetchCallback(fetchCallback) , Consumer(consumer) { for (auto&& i : portions) { - PortionsByPath[i.second->GetPathId()].emplace_back(i.second); + PortionsByPath[i->GetPathId()].emplace_back(i); } } diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp index 97400ff1865e..18b138b607d7 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp @@ -5,10 +5,15 @@ namespace NKikimr::NOlap::NDataAccessorControl { -THashMap IGranuleDataAccessor::AskData( +void IGranuleDataAccessor::AskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { AFL_VERIFY(portions.size()); - return DoAskData(portions, callback, consumer); + DoAskData(portions, callback, consumer); +} + +TDataCategorized IGranuleDataAccessor::AnalyzeData( + const std::vector& portions, const TString& consumer) { + return DoAnalyzeData(portions, consumer); } void TActorAccessorsCallback::OnAccessorsFetched(std::vector&& accessors) { diff --git a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h index f34f867e7a0d..1d0bdf041520 100644 --- a/ydb/core/tx/columnshard/data_accessor/abstract/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/abstract/collector.h @@ -20,12 +20,27 @@ class TActorAccessorsCallback: public IAccessorCallback { } }; +class TDataCategorized { +private: + YDB_READONLY_DEF(std::vector, PortionsToAsk); + YDB_READONLY_DEF(std::vector, CachedAccessors); + +public: + void AddToAsk(const TPortionInfo::TConstPtr& p) { + PortionsToAsk.emplace_back(p); + } + void AddFromCache(const TPortionDataAccessor& accessor) { + CachedAccessors.emplace_back(accessor); + } +}; + class IGranuleDataAccessor { private: const ui64 PathId; - virtual THashMap DoAskData( + virtual void DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) = 0; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) = 0; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) = 0; public: @@ -39,8 +54,9 @@ class IGranuleDataAccessor { : PathId(pathId) { } - THashMap AskData( + void AskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer); + TDataCategorized AnalyzeData(const std::vector& portions, const TString& consumer); void ModifyPortions(const std::vector& add, const std::vector& remove) { return DoModifyPortions(add, remove); } diff --git a/ydb/core/tx/columnshard/data_accessor/events.h b/ydb/core/tx/columnshard/data_accessor/events.h index 5f9c48ee9332..d5fb45aa42be 100644 --- a/ydb/core/tx/columnshard/data_accessor/events.h +++ b/ydb/core/tx/columnshard/data_accessor/events.h @@ -77,13 +77,12 @@ class TEvUnregisterController class TEvAskTabletDataAccessors: public NActors::TEventLocal { private: - using TPortions = THashMap; - YDB_ACCESSOR_DEF(TPortions, Portions); + YDB_ACCESSOR_DEF(std::vector, Portions); YDB_READONLY_DEF(std::shared_ptr, Callback); YDB_READONLY_DEF(TString, Consumer); public: - explicit TEvAskTabletDataAccessors(const THashMap& portions, + explicit TEvAskTabletDataAccessors(const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) : Portions(portions) , Callback(callback) diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp index 42a5558dc17a..a8a1003a045b 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp @@ -2,15 +2,19 @@ namespace NKikimr::NOlap::NDataAccessorControl::NInMem { -THashMap TCollector::DoAskData( +void TCollector::DoAskData( const std::vector& portions, const std::shared_ptr& /*callback*/, const TString& /*consumer*/) { - THashMap accessors; + AFL_VERIFY(portions.empty()); +} + +TDataCategorized TCollector::DoAnalyzeData(const std::vector& portions, const TString& /*consumer*/) { + TDataCategorized result; for (auto&& i : portions) { auto it = Accessors.find(i->GetPortionId()); AFL_VERIFY(it != Accessors.end()); - accessors.emplace(i->GetPortionId(), it->second); + result.AddFromCache(it->second); } - return accessors; + return result; } void TCollector::DoModifyPortions(const std::vector& add, const std::vector& remove) { @@ -22,4 +26,4 @@ void TCollector::DoModifyPortions(const std::vector& add, } } -} \ No newline at end of file +} // namespace NKikimr::NOlap::NDataAccessorControl::NInMem diff --git a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h index ead6b25ac23e..41ab570d7f25 100644 --- a/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/in_mem/collector.h @@ -6,8 +6,9 @@ class TCollector: public IGranuleDataAccessor { private: using TBase = IGranuleDataAccessor; THashMap Accessors; - virtual THashMap DoAskData( - const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) override; + virtual void DoAskData(const std::vector& portions, const std::shared_ptr& callback, + const TString& consumer) override; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp index 08be63308d6a..2c00d0b3cbb6 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp @@ -3,23 +3,25 @@ #include namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB { -THashMap TCollector::DoAskData( +void TCollector::DoAskData( const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) { - THashMap accessors; - THashMap portionsToDirectAsk; + if (portions.size()) { + NActors::TActivationContext::Send( + TabletActorId, std::make_unique(portions, callback, consumer)); + } +} + +TDataCategorized TCollector::DoAnalyzeData(const std::vector& portions, const TString& consumer) { + TDataCategorized result; for (auto&& p : portions) { auto it = AccessorsCache.Find(p->GetPortionId()); if (it != AccessorsCache.End() && it.Key() == p->GetPortionId()) { - accessors.emplace(p->GetPortionId(), it.Value()); + result.AddFromCache(it.Value()); } else { - portionsToDirectAsk.emplace(p->GetPortionId(), p); + result.AddToAsk(p); } } - if (portionsToDirectAsk.size()) { - NActors::TActivationContext::Send( - TabletActorId, std::make_unique(portionsToDirectAsk, callback, consumer)); - } - return accessors; + return result; } void TCollector::DoModifyPortions(const std::vector& add, const std::vector& remove) { diff --git a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h index d52ca722ff49..c879224f97fe 100644 --- a/ydb/core/tx/columnshard/data_accessor/local_db/collector.h +++ b/ydb/core/tx/columnshard/data_accessor/local_db/collector.h @@ -17,8 +17,9 @@ class TCollector: public IGranuleDataAccessor { TLRUCache AccessorsCache; using TBase = IGranuleDataAccessor; - virtual THashMap DoAskData(const std::vector& portions, + virtual void DoAskData(const std::vector& portions, const std::shared_ptr& callback, const TString& consumer) override; + virtual TDataCategorized DoAnalyzeData(const std::vector& portions, const TString& consumer) override; virtual void DoModifyPortions(const std::vector& add, const std::vector& remove) override; public: diff --git a/ydb/core/tx/columnshard/data_accessor/manager.cpp b/ydb/core/tx/columnshard/data_accessor/manager.cpp index f3a4cde0ec8e..182f8d81fbed 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.cpp +++ b/ydb/core/tx/columnshard/data_accessor/manager.cpp @@ -2,4 +2,112 @@ namespace NKikimr::NOlap::NDataAccessorControl { +void TLocalManager::DrainQueue() { + THashMap> portionsToAsk; + std::optional lastPathId; + IGranuleDataAccessor* lastDataAccessor = nullptr; + ui32 countToFlight = 0; + while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) { + if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) { + PortionsAsk.pop_front(); + continue; + } + auto p = PortionsAsk.front().ExtractPortion(); + PortionsAsk.pop_front(); + if (!lastPathId || *lastPathId != p->GetPathId()) { + lastPathId = p->GetPathId(); + auto it = Managers.find(p->GetPathId()); + if (it == Managers.end()) { + lastDataAccessor = nullptr; + } else { + lastDataAccessor = it->second.get(); + } + } + if (!lastDataAccessor) { + auto it = RequestsByPortion.find(p->GetPortionId()); + AFL_VERIFY(it != RequestsByPortion.end()); + for (auto&& i : it->second) { + if (!i->IsFetched()) { + i->AddError(p->GetPathId(), "path id absent"); + } + } + RequestsByPortion.erase(it); + } else { + portionsToAsk[p->GetPathId()].emplace_back(p); + ++countToFlight; + } + } + for (auto&& i : portionsToAsk) { + auto it = Managers.find(i.first); + AFL_VERIFY(it != Managers.end()); + auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE"); + for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) { + auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); + AFL_VERIFY(it != RequestsByPortion.end()); + for (auto&& i : it->second) { + if (!i->IsFetched()) { + i->AddAccessor(accessor); + } + } + RequestsByPortion.erase(it); + AFL_VERIFY(countToFlight); + --countToFlight; + } + if (dataAnalyzed.GetPortionsToAsk().size()) { + it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE"); + } + } + } + PortionsAskInFlight += countToFlight; +} + +void TLocalManager::DoAskData(const std::shared_ptr& request) { + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString()); + for (auto&& pathId : request->GetPathIds()) { + auto portions = request->StartFetching(pathId); + for (auto&& [_, i] : portions) { + auto itRequest = RequestsByPortion.find(i->GetPortionId()); + if (itRequest == RequestsByPortion.end()) { + AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector>({request})).second); + PortionsAsk.emplace_back(i, request->GetAbortionFlag()); + } else { + itRequest->second.emplace_back(request); + } + } + } + DrainQueue(); +} + +void TLocalManager::DoRegisterController(std::unique_ptr&& controller, const bool update) { + if (update) { + auto it = Managers.find(controller->GetPathId()); + if (it != Managers.end()) { + it->second = std::move(controller); + } + } else { + AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second); + } +} + +void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) { + { + auto it = Managers.find(accessor.GetPortionInfo().GetPathId()); + AFL_VERIFY(it != Managers.end()); + it->second->ModifyPortions({ accessor }, {}); + } + { + auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); + if (it != RequestsByPortion.end()) { + for (auto&& i : it->second) { + i->AddAccessor(accessor); + } + AFL_VERIFY(PortionsAskInFlight); + --PortionsAskInFlight; + } + RequestsByPortion.erase(it); + } + DrainQueue(); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/data_accessor/manager.h b/ydb/core/tx/columnshard/data_accessor/manager.h index 83e9155cea03..d4bbefa60e4d 100644 --- a/ydb/core/tx/columnshard/data_accessor/manager.h +++ b/ydb/core/tx/columnshard/data_accessor/manager.h @@ -95,67 +95,33 @@ class TLocalManager: public IDataAccessorsManager { THashMap>> RequestsByPortion; const std::shared_ptr AccessorCallback; - virtual void DoAskData(const std::shared_ptr& request) override { - AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString()); - for (auto&& i : request->GetPathIds()) { - auto it = Managers.find(i); - if (it == Managers.end()) { - request->AddError(i, "incorrect path id"); - } else { - auto portions = request->StartFetching(i); - std::vector portionsAsk; - for (auto&& [_, i] : portions) { - auto itRequest = RequestsByPortion.find(i->GetPortionId()); - if (itRequest == RequestsByPortion.end()) { - portionsAsk.emplace_back(i); - } else { - itRequest->second.emplace_back(request); - } - } - if (portionsAsk.empty()) { - continue; - } - auto accessors = it->second->AskData(portionsAsk, AccessorCallback, request->GetConsumer()); - for (auto&& p : portionsAsk) { - auto itAccessor = accessors.find(p->GetPortionId()); - if (itAccessor == accessors.end()) { - AFL_VERIFY(RequestsByPortion.emplace(p->GetPortionId(), std::vector>({request})).second); - } else { - request->AddAccessor(itAccessor->second); - } - } - } + class TPortionToAsk { + private: + TPortionInfo::TConstPtr Portion; + YDB_READONLY_DEF(std::shared_ptr, AbortionFlag); + + public: + TPortionToAsk(const TPortionInfo::TConstPtr& portion, const std::shared_ptr& abortionFlag) + : Portion(portion) + , AbortionFlag(abortionFlag) { } - } - virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override { - if (update) { - auto it = Managers.find(controller->GetPathId()); - if (it != Managers.end()) { - it->second = std::move(controller); - } - } else { - AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second); + + TPortionInfo::TConstPtr ExtractPortion() { + return std::move(Portion); } - } + }; + + std::deque PortionsAsk; + ui64 PortionsAskInFlight = 0; + + void DrainQueue(); + + virtual void DoAskData(const std::shared_ptr& request) override; + virtual void DoRegisterController(std::unique_ptr&& controller, const bool update) override; virtual void DoUnregisterController(const ui64 pathId) override { AFL_VERIFY(Managers.erase(pathId)); } - virtual void DoAddPortion(const TPortionDataAccessor& accessor) override { - { - auto it = Managers.find(accessor.GetPortionInfo().GetPathId()); - AFL_VERIFY(it != Managers.end()); - it->second->ModifyPortions({ accessor }, {}); - } - { - auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId()); - if (it != RequestsByPortion.end()) { - for (auto&& i : it->second) { - i->AddAccessor(accessor); - } - } - RequestsByPortion.erase(it); - } - } + virtual void DoAddPortion(const TPortionDataAccessor& accessor) override; virtual void DoRemovePortion(const TPortionInfo::TConstPtr& portionInfo) override { auto it = Managers.find(portionInfo->GetPathId()); AFL_VERIFY(it != Managers.end()); diff --git a/ydb/core/tx/columnshard/data_accessor/request.h b/ydb/core/tx/columnshard/data_accessor/request.h index 31ab85cd6578..2d5f29ad2040 100644 --- a/ydb/core/tx/columnshard/data_accessor/request.h +++ b/ydb/core/tx/columnshard/data_accessor/request.h @@ -50,7 +50,7 @@ class TDataAccessorsResult: private NNonCopyable::TMoveOnly { } void AddError(const ui64 pathId, const TString& errorMessage) { - AFL_VERIFY(ErrorsByPathId.emplace(pathId, errorMessage).second); + ErrorsByPathId.emplace(pathId, errorMessage); } bool HasErrors() const { @@ -63,6 +63,7 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo THashSet RequestIds; virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) = 0; + virtual const std::shared_ptr& DoGetAbortionFlag() const = 0; void OnRequestsFinished(TDataAccessorsResult&& result) { DoOnRequestsFinished(std::move(result)); @@ -85,12 +86,18 @@ class IDataAccessorRequestsSubscriber: public NColumnShard::TMonitoringObjectsCo OnRequestsFinished(std::move(*Result)); } } + const std::shared_ptr& GetAbortionFlag() const { + return DoGetAbortionFlag(); + } virtual ~IDataAccessorRequestsSubscriber() = default; }; class TFakeDataAccessorsSubscriber: public IDataAccessorRequestsSubscriber { private: + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& /*result*/) override { } }; @@ -225,6 +232,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter& GetAbortionFlag() const { + AFL_VERIFY(HasSubscriber()); + return Subscriber->GetAbortionFlag(); + } + bool HasSubscriber() const { return !!Subscriber; } diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 6b4666ceae7e..f9cb5dac128a 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -53,7 +53,9 @@ class TReadContext { const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; - TAtomic AbortFlag = 0; + std::shared_ptr AbortionFlag = std::make_shared(0); + std::shared_ptr ConstAbortionFlag = AbortionFlag; + public: template std::shared_ptr GetReadMetadataPtrVerifiedAs() const { @@ -66,13 +68,29 @@ class TReadContext { return ReadMetadata->GetScanCursor(); } + const std::shared_ptr& GetAbortionFlag() const { + return ConstAbortionFlag; + } + void AbortWithError(const TString& errorMessage) { - if (AtomicCas(&AbortFlag, 1, 0)) { + if (AbortionFlag->Inc() == 1) { NActors::TActivationContext::Send( ScanActorId, std::make_unique(TConclusionStatus::Fail(errorMessage))); } } + void Stop() { + AbortionFlag->Inc(); + } + + bool IsActive() const { + return AbortionFlag->Val() == 0; + } + + bool IsAborted() const { + return AbortionFlag->Val(); + } + bool IsReverse() const { return ReadMetadata->IsDescSorted(); } @@ -127,7 +145,8 @@ class TReadContext { , ScanActorId(scanActorId) , ResourceSubscribeActorId(resourceSubscribeActorId) , ReadCoordinatorActorId(readCoordinatorActorId) - , ComputeShardingPolicy(computeShardingPolicy) { + , ComputeShardingPolicy(computeShardingPolicy) + { Y_ABORT_UNLESS(ReadMetadata); } }; diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h index 9f2ae6d4bcba..d0376d74d296 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h @@ -33,7 +33,6 @@ class TSpecialReadContext { YDB_READONLY_DEF(std::shared_ptr, FetchingStageMemory); TReadMetadata::TConstPtr ReadMetadata; - TAtomic AbortFlag = 0; virtual std::shared_ptr DoGetColumnsFetchingPlan(const std::shared_ptr& source) = 0; @@ -67,12 +66,16 @@ class TSpecialReadContext { return ProcessMemoryGuard->GetProcessId(); } + bool IsActive() const { + return !CommonContext->IsAborted(); + } + bool IsAborted() const { - return AtomicGet(AbortFlag); + return CommonContext->IsAborted(); } void Abort() { - AtomicSet(AbortFlag, 1); + CommonContext->Stop(); } virtual ~TSpecialReadContext() { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index 9a9bd23c8571..0e0e5e7695b2 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -232,6 +232,9 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Source->GetContext()->GetCommonContext()->GetAbortionFlag(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { AFL_VERIFY(!result.HasErrors()); AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size()); diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h index 08ec74361360..adfe861d6319 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h @@ -61,7 +61,7 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin return *Scanner; } virtual void OnSentDataFromInterval(const ui32 sourceIdx) const override { - if (SpecialReadContext->IsAborted()) { + if (!SpecialReadContext->IsActive()) { return; } Scanner->ContinueSource(sourceIdx); @@ -71,7 +71,7 @@ class TPlainReadData: public IDataReader, TNonCopyable, NColumnShard::TMonitorin TPlainReadData(const std::shared_ptr& context); ~TPlainReadData() { - if (!SpecialReadContext->IsAborted()) { + if (SpecialReadContext->IsActive()) { Abort("unexpected on destructor"); } } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp index c43890afe0b4..bc4e34df7f17 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/scanner.cpp @@ -116,7 +116,7 @@ TScanHead::TScanHead(std::deque>&& sources, const s } TConclusion TScanHead::BuildNextInterval() { - if (Context->IsAborted()) { + if (!Context->IsActive()) { return false; } bool changed = false; @@ -139,7 +139,7 @@ bool TScanHead::IsReverse() const { } void TScanHead::Abort() { - AFL_VERIFY(Context->IsAborted()); + AFL_VERIFY(!Context->IsActive()); for (auto&& i : FetchingSources) { i->Abort(); } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index af65ff1a8fc6..4281309386e2 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -234,6 +234,10 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber TFetchingScriptCursor Step; std::shared_ptr Source; const NColumnShard::TCounterGuard Guard; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Source->GetContext()->GetCommonContext()->GetAbortionFlag(); + } + virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { AFL_VERIFY(!result.HasErrors()); AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size()); @@ -243,7 +247,6 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber auto task = std::make_shared(Source, std::move(Step), Source->GetContext()->GetCommonContext()->GetScanActorId()); NConveyor::TScanServiceOperator::SendTaskToExecute(task); } - public: TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr& source) : Step(step) diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index fb238100d264..92fdf0689850 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -220,4 +220,8 @@ void TStatsIterator::TFetchingAccessorAllocation::DoOnAllocationImpossible(const Context->AbortWithError("cannot allocate memory for take accessors info: " + errorMessage); } +const std::shared_ptr& TStatsIterator::TFetchingAccessorAllocation::DoGetAbortionFlag() const { + return Context->GetAbortionFlag(); +} + } // namespace NKikimr::NOlap::NReader::NSysView::NChunks diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h index 65afefa13fe1..c09a4f6d448b 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.h @@ -107,6 +107,7 @@ class TStatsIterator: public NAbstract::TStatsIterator Context; + virtual const std::shared_ptr& DoGetAbortionFlag() const override; virtual bool DoOnAllocated(std::shared_ptr&& guard, const std::shared_ptr& /*selfPtr*/) override { Guard = std::move(guard); diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 9635c24b3325..3f3a035f6817 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -358,6 +358,9 @@ class TTestCompactionAccessorsSubscriber: public NOlap::IDataAccessorRequestsSub private: std::shared_ptr Changes; const std::shared_ptr VersionedIndex; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { const TDataAccessorsInitializationContext context(VersionedIndex); @@ -438,6 +441,9 @@ class TTestMetadataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubsc std::shared_ptr Processor; TColumnEngineForLogs& Engine; + virtual const std::shared_ptr& DoGetAbortionFlag() const override { + return Default>(); + } virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override { Processor->ApplyResult( NOlap::NResourceBroker::NSubscribe::TResourceContainer::BuildForTest(std::move(result)), Engine);