Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

accessors fetching control #12859

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class TColumnPortionsAccumulator {
const std::shared_ptr<TResultAccumulator> Result;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
const std::set<ui32> ColumnTagsRequested;
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
return Default<std::shared_ptr<const TAtomicCounter>>();
}

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override {
THashMap<ui32, std::unique_ptr<TCountMinSketch>> sketchesByColumns;
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,9 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;
virtual const std::shared_ptr<const TAtomicCounter>& DoGetAbortionFlag() const override {
return Default<std::shared_ptr<const TAtomicCounter>>();
}

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
AFL_VERIFY(ResourcesGuard);
Expand Down Expand Up @@ -1422,13 +1425,13 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
std::vector<NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
: TBase(self)
, FetchCallback(fetchCallback)
, Consumer(consumer)
{
for (auto&& i : portions) {
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
PortionsByPath[i->GetPathId()].emplace_back(i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@

namespace NKikimr::NOlap::NDataAccessorControl {

THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
void IGranuleDataAccessor::AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
AFL_VERIFY(portions.size());
return DoAskData(portions, callback, consumer);
DoAskData(portions, callback, consumer);
}

TDataCategorized IGranuleDataAccessor::AnalyzeData(
const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
return DoAnalyzeData(portions, consumer);
}

void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,27 @@ class TActorAccessorsCallback: public IAccessorCallback {
}
};

class TDataCategorized {
private:
YDB_READONLY_DEF(std::vector<TPortionInfo::TConstPtr>, PortionsToAsk);
YDB_READONLY_DEF(std::vector<TPortionDataAccessor>, CachedAccessors);

public:
void AddToAsk(const TPortionInfo::TConstPtr& p) {
PortionsToAsk.emplace_back(p);
}
void AddFromCache(const TPortionDataAccessor& accessor) {
CachedAccessors.emplace_back(accessor);
}
};

class IGranuleDataAccessor {
private:
const ui64 PathId;

virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
virtual void DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) = 0;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;

public:
Expand All @@ -39,8 +54,9 @@ class IGranuleDataAccessor {
: PathId(pathId) {
}

THashMap<ui64, TPortionDataAccessor> AskData(
void AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer);
TDataCategorized AnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer);
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
return DoModifyPortions(add, remove);
}
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,12 @@ class TEvUnregisterController

class TEvAskTabletDataAccessors: public NActors::TEventLocal<TEvAskTabletDataAccessors, NColumnShard::TEvPrivate::EEv::EvAskTabletDataAccessors> {
private:
using TPortions = THashMap<ui64, TPortionInfo::TConstPtr>;
YDB_ACCESSOR_DEF(TPortions, Portions);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TConstPtr>, Portions);
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IAccessorCallback>, Callback);
YDB_READONLY_DEF(TString, Consumer);

public:
explicit TEvAskTabletDataAccessors(const THashMap<ui64, TPortionInfo::TConstPtr>& portions,
explicit TEvAskTabletDataAccessors(const std::vector<TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback, const TString& consumer)
: Portions(portions)
, Callback(callback)
Expand Down
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,19 @@

namespace NKikimr::NOlap::NDataAccessorControl::NInMem {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
void TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/, const TString& /*consumer*/) {
THashMap<ui64, TPortionDataAccessor> accessors;
AFL_VERIFY(portions.empty());
}

TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& /*consumer*/) {
TDataCategorized result;
for (auto&& i : portions) {
auto it = Accessors.find(i->GetPortionId());
AFL_VERIFY(it != Accessors.end());
accessors.emplace(i->GetPortionId(), it->second);
result.AddFromCache(it->second);
}
return accessors;
return result;
}

void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
Expand All @@ -22,4 +26,4 @@ void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
}
}

}
} // namespace NKikimr::NOlap::NDataAccessorControl::NInMem
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/in_mem/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ class TCollector: public IGranuleDataAccessor {
private:
using TBase = IGranuleDataAccessor;
THashMap<ui64, TPortionDataAccessor> Accessors;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback,
const TString& consumer) override;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
const std::vector<ui64>& remove) override;

Expand Down
22 changes: 12 additions & 10 deletions ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@
#include <ydb/core/tx/columnshard/data_accessor/events.h>
namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
void TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
THashMap<ui64, TPortionDataAccessor> accessors;
THashMap<ui64, TPortionInfo::TConstPtr> portionsToDirectAsk;
if (portions.size()) {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portions, callback, consumer));
}
}

TDataCategorized TCollector::DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) {
TDataCategorized result;
for (auto&& p : portions) {
auto it = AccessorsCache.Find(p->GetPortionId());
if (it != AccessorsCache.End() && it.Key() == p->GetPortionId()) {
accessors.emplace(p->GetPortionId(), it.Value());
result.AddFromCache(it.Value());
} else {
portionsToDirectAsk.emplace(p->GetPortionId(), p);
result.AddToAsk(p);
}
}
if (portionsToDirectAsk.size()) {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback, consumer));
}
return accessors;
return result;
}

void TCollector::DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ class TCollector: public IGranuleDataAccessor {

TLRUCache<ui64, TPortionDataAccessor, TNoopDelete, TMetadataSizeProvider> AccessorsCache;
using TBase = IGranuleDataAccessor;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
virtual void DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual TDataCategorized DoAnalyzeData(const std::vector<TPortionInfo::TConstPtr>& portions, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) override;

public:
Expand Down
108 changes: 108 additions & 0 deletions ydb/core/tx/columnshard/data_accessor/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,112 @@

namespace NKikimr::NOlap::NDataAccessorControl {

void TLocalManager::DrainQueue() {
THashMap<ui64, std::vector<TPortionInfo::TConstPtr>> portionsToAsk;
std::optional<ui64> lastPathId;
IGranuleDataAccessor* lastDataAccessor = nullptr;
ui32 countToFlight = 0;
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
while (PortionsAskInFlight + countToFlight < 1000 && PortionsAsk.size()) {
if (PortionsAsk.front().GetAbortionFlag() && PortionsAsk.front().GetAbortionFlag()->Val()) {
PortionsAsk.pop_front();
continue;
}
auto p = PortionsAsk.front().ExtractPortion();
PortionsAsk.pop_front();
if (!lastPathId || *lastPathId != p->GetPathId()) {
lastPathId = p->GetPathId();
auto it = Managers.find(p->GetPathId());
if (it == Managers.end()) {
lastDataAccessor = nullptr;
} else {
lastDataAccessor = it->second.get();
}
}
if (!lastDataAccessor) {
auto it = RequestsByPortion.find(p->GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
i->AddError(p->GetPathId(), "path id absent");
}
}
RequestsByPortion.erase(it);
} else {
portionsToAsk[p->GetPathId()].emplace_back(p);
++countToFlight;
}
}
for (auto&& i : portionsToAsk) {
auto it = Managers.find(i.first);
AFL_VERIFY(it != Managers.end());
auto dataAnalyzed = it->second->AnalyzeData(i.second, "ANALYZE");
for (auto&& accessor : dataAnalyzed.GetCachedAccessors()) {
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
AFL_VERIFY(it != RequestsByPortion.end());
for (auto&& i : it->second) {
if (!i->IsFetched()) {
i->AddAccessor(accessor);
}
}
RequestsByPortion.erase(it);
AFL_VERIFY(countToFlight);
--countToFlight;
}
if (dataAnalyzed.GetPortionsToAsk().size()) {
it->second->AskData(dataAnalyzed.GetPortionsToAsk(), AccessorCallback, "ANALYZE");
}
}
}
PortionsAskInFlight += countToFlight;
}

void TLocalManager::DoAskData(const std::shared_ptr<TDataAccessorsRequest>& request) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ask_data")("request", request->DebugString());
for (auto&& pathId : request->GetPathIds()) {
auto portions = request->StartFetching(pathId);
for (auto&& [_, i] : portions) {
auto itRequest = RequestsByPortion.find(i->GetPortionId());
if (itRequest == RequestsByPortion.end()) {
AFL_VERIFY(RequestsByPortion.emplace(i->GetPortionId(), std::vector<std::shared_ptr<TDataAccessorsRequest>>({request})).second);
PortionsAsk.emplace_back(i, request->GetAbortionFlag());
} else {
itRequest->second.emplace_back(request);
}
}
}
DrainQueue();
}

void TLocalManager::DoRegisterController(std::unique_ptr<IGranuleDataAccessor>&& controller, const bool update) {
if (update) {
auto it = Managers.find(controller->GetPathId());
if (it != Managers.end()) {
it->second = std::move(controller);
}
} else {
AFL_VERIFY(Managers.emplace(controller->GetPathId(), std::move(controller)).second);
}
}

void TLocalManager::DoAddPortion(const TPortionDataAccessor& accessor) {
{
auto it = Managers.find(accessor.GetPortionInfo().GetPathId());
AFL_VERIFY(it != Managers.end());
it->second->ModifyPortions({ accessor }, {});
}
{
auto it = RequestsByPortion.find(accessor.GetPortionInfo().GetPortionId());
if (it != RequestsByPortion.end()) {
for (auto&& i : it->second) {
i->AddAccessor(accessor);
}
AFL_VERIFY(PortionsAskInFlight);
--PortionsAskInFlight;
}
RequestsByPortion.erase(it);
}
DrainQueue();
}

} // namespace NKikimr::NOlap
Loading
Loading