Skip to content

Commit

Permalink
register memory allocated for metadata in tiering actualizer (#12348)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 10, 2024
1 parent dfc089c commit 34cd26c
Show file tree
Hide file tree
Showing 22 changed files with 254 additions and 114 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TColumnPortionsAccumulator {
sketchesByColumns.emplace(id, TCountMinSketch::Create());
}

for (const auto& portionInfo : result.GetPortions()) {
for (const auto& [id, portionInfo] : result.GetPortions()) {
std::shared_ptr<NOlap::ISnapshotSchema> portionSchema = portionInfo.GetPortionInfo().GetSchema(*VersionedIndex);
for (const ui32 columnId : ColumnTagsRequested) {
auto indexMeta = portionSchema->GetIndexInfo().GetIndexMetaCountMinSketch({ columnId });
Expand Down
51 changes: 38 additions & 13 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,27 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
}
};

class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
class TDataAccessorsSubscriberBase: public NOlap::IDataAccessorRequestsSubscriber {
private:
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
AFL_VERIFY(ResourcesGuard);
DoOnRequestsFinished(std::move(result), std::move(ResourcesGuard));
}

protected:
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) = 0;

public:
void SetResourcesGuard(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
AFL_VERIFY(!ResourcesGuard);
AFL_VERIFY(guard);
ResourcesGuard = guard;
}
};

class TDataAccessorsSubscriber: public TDataAccessorsSubscriberBase {
protected:
const NActors::TActorId ShardActorId;
std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
Expand All @@ -625,8 +645,9 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {

virtual void DoOnRequestsFinishedImpl() = 0;

virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override final {
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) override final {
Changes->SetFetchedDataAccessors(std::move(result), NOlap::TDataAccessorsInitializationContext(VersionedIndex));
Changes->ResourcesGuard = std::move(guard);
DoOnRequestsFinishedImpl();
}

Expand Down Expand Up @@ -822,7 +843,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa
private:
using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
std::shared_ptr<TDataAccessorsSubscriber> Subscriber;
std::shared_ptr<TDataAccessorsSubscriberBase> Subscriber;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;

virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
Expand All @@ -833,7 +854,7 @@ class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITa

public:
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriber>& subscriber,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriberBase>& subscriber,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: TBase(0, memory, externalTaskId, context)
, Request(std::move(request))
Expand All @@ -852,7 +873,6 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);

auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}
Expand Down Expand Up @@ -895,7 +915,6 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe
virtual void DoOnRequestsFinishedImpl() override {
ACFL_DEBUG("background", "ttl")("need_writes", true);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TTTLChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}
Expand All @@ -920,14 +939,16 @@ class TNoWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscri
using TBase::TBase;
};

class TCSMetadataSubscriber: public NOlap::IDataAccessorRequestsSubscriber, public TObjectCounter<TCSMetadataSubscriber> {
class TCSMetadataSubscriber: public TDataAccessorsSubscriberBase, public TObjectCounter<TCSMetadataSubscriber> {
private:
NActors::TActorId TabletActorId;
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor> Processor;
const ui64 Generation;
virtual void DoOnRequestsFinished(NOlap::TDataAccessorsResult&& result) override {
virtual void DoOnRequestsFinished(
NOlap::TDataAccessorsResult&& result, std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& guard) override {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<TEvPrivate::TEvMetadataAccessorsInfo>(Processor, Generation, std::move(result)));
TabletActorId, std::make_unique<TEvPrivate::TEvMetadataAccessorsInfo>(Processor, Generation,
NOlap::NResourceBroker::NSubscribe::TResourceContainer(std::move(result), std::move(guard))));
}

public:
Expand All @@ -947,8 +968,12 @@ void TColumnShard::SetupMetadata() {
}
std::vector<NOlap::TCSMetadataRequest> requests = TablesManager.MutablePrimaryIndex().CollectMetadataRequests();
for (auto&& i : requests) {
i.GetRequest()->RegisterSubscriber(std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()));
DataAccessorsManager->AskData(i.GetRequest());
const ui64 accessorsMemory =
i.GetRequest()->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor,
std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i.GetRequest()->GetTaskId(), TTLTaskSubscription,
std::shared_ptr<NOlap::TDataAccessorsRequest>(i.GetRequest()),
std::make_shared<TCSMetadataSubscriber>(SelfId(), i.GetProcessor(), Generation()), DataAccessorsManager.GetObjectPtrVerified()));
}
}

Expand Down Expand Up @@ -1004,7 +1029,6 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber {
virtual void DoOnRequestsFinishedImpl() override {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("background", "cleanup")("changes_info", Changes->DebugString());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
NActors::TActivationContext::Send(ShardActorId, std::move(ev));
}
Expand Down Expand Up @@ -1093,7 +1117,8 @@ void TColumnShard::Handle(TEvPrivate::TEvStartCompaction::TPtr& ev, const TActor

void TColumnShard::Handle(TEvPrivate::TEvMetadataAccessorsInfo::TPtr& ev, const TActorContext& /*ctx*/) {
AFL_VERIFY(ev->Get()->GetGeneration() == Generation())("ev", ev->Get()->GetGeneration())("tablet", Generation());
ev->Get()->GetProcessor()->ApplyResult(ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>());
ev->Get()->GetProcessor()->ApplyResult(
ev->Get()->ExtractResult(), TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>());
SetupMetadata();
}

Expand Down
14 changes: 9 additions & 5 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
#include <ydb/core/tx/columnshard/engines/writer/write_controller.h>
#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/resource_subscriber/container.h>
#include <ydb/core/tx/data_events/write_data.h>
#include <ydb/core/tx/priorities/usage/abstract.h>

Expand Down Expand Up @@ -75,7 +76,7 @@ struct TEvPrivate {
private:
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor> Processor;
const ui64 Generation;
NOlap::TDataAccessorsResult Result;
std::optional<NOlap::NResourceBroker::NSubscribe::TResourceContainer<NOlap::TDataAccessorsResult>> Result;

public:
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor>& GetProcessor() const {
Expand All @@ -84,12 +85,15 @@ struct TEvPrivate {
ui64 GetGeneration() const {
return Generation;
}
NOlap::TDataAccessorsResult ExtractResult() {
return std::move(Result);
NOlap::NResourceBroker::NSubscribe::TResourceContainer<NOlap::TDataAccessorsResult> ExtractResult() {
AFL_VERIFY(Result);
auto result = std::move(*Result);
Result.reset();
return result;
}

TEvMetadataAccessorsInfo(
const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor>& processor, const ui64 gen, NOlap::TDataAccessorsResult&& result)
TEvMetadataAccessorsInfo(const std::shared_ptr<NOlap::IMetadataAccessorResultProcessor>& processor, const ui64 gen,
NOlap::NResourceBroker::NSubscribe::TResourceContainer<NOlap::TDataAccessorsResult>&& result)
: Processor(processor)
, Generation(gen)
, Result(std::move(result)) {
Expand Down
42 changes: 21 additions & 21 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,38 @@
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

namespace NKikimr::NOlap {

class TDataAccessorsRequest;

class TDataAccessorsResult {
class TDataAccessorsResult: private NNonCopyable::TMoveOnly {
private:
THashMap<ui64, TString> ErrorsByPathId;
THashMap<ui64, std::vector<TPortionDataAccessor>> AccessorsByPathId;
THashMap<ui64, TPortionDataAccessor> PortionsById;
std::vector<TPortionDataAccessor> Portions;

public:
const std::vector<TPortionDataAccessor>& GetPortions() const {
return Portions;
const THashMap<ui64, TPortionDataAccessor>& GetPortions() const {
return PortionsById;
}

std::vector<TPortionDataAccessor> ExtractPortionsVector() {
std::vector<TPortionDataAccessor> portions;
portions.reserve(PortionsById.size());
for (auto&& [_, portionInfo] : PortionsById) {
portions.emplace_back(std::move(portionInfo));
}
return portions;
}

void Merge(TDataAccessorsResult&& result) {
for (auto&& i : result.ErrorsByPathId) {
AFL_VERIFY(ErrorsByPathId.emplace(i.first, i.second).second);
}
for (auto&& i : result.AccessorsByPathId) {
AFL_VERIFY(AccessorsByPathId.emplace(i.first, std::move(i.second)).second);
}
for (auto&& i : result.PortionsById) {
AFL_VERIFY(PortionsById.emplace(i.first, std::move(i.second)).second);
}
Portions.insert(Portions.end(), result.Portions.begin(), result.Portions.end());
}

const TPortionDataAccessor& GetPortionAccessorVerified(const ui64 portionId) const {
Expand All @@ -38,18 +42,10 @@ class TDataAccessorsResult {
return it->second;
}

std::vector<TPortionDataAccessor> ExtractPortionsVector() {
return std::move(Portions);
}

void AddData(const ui64 pathId, THashMap<ui64, TPortionDataAccessor>&& accessors) {
auto info = AccessorsByPathId.emplace(pathId, std::vector<TPortionDataAccessor>());
AFL_VERIFY(info.second);
auto& v = info.first->second;
void AddData(THashMap<ui64, TPortionDataAccessor>&& accessors) {
std::deque<TPortionDataAccessor> v;
for (auto&& [portionId, i] : accessors) {
v.emplace_back(std::move(i));
AFL_VERIFY(PortionsById.emplace(portionId, v.back()).second);
Portions.emplace_back(v.back());
AFL_VERIFY(PortionsById.emplace(portionId, i).second);
}
}

Expand Down Expand Up @@ -301,7 +297,7 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
if (itStatus->second.IsFinished()) {
AFL_VERIFY(FetchingCount.Dec() >= 0);
ReadyCount.Inc();
AccessorsByPathId.AddData(pathId, itStatus->second.DetachAccessors());
AccessorsByPathId.AddData(itStatus->second.DetachAccessors());
PathIdStatus.erase(itStatus);
}
}
Expand All @@ -315,6 +311,10 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
}
}
}

TString GetTaskId() const {
return TStringBuilder() << "data-accessor-request-" << RequestId;
}
};

} // namespace NKikimr::NOlap
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/data_accessor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/portions
ydb/core/tx/columnshard/data_accessor/abstract
ydb/core/tx/columnshard/data_accessor/local_db
ydb/core/tx/columnshard/resource_subscriber
)

END()
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
return;
}
THashMap<TString, THashSet<TUnifiedBlobId>> blobIdsByStorage;
for (auto&& p : FetchedDataAccessors->GetPortions()) {
for (auto&& [_, p] : FetchedDataAccessors->GetPortions()) {
p.RemoveFromDatabase(context.DBWrapper);
p.FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPortionInfo().GetPathId());
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <ydb/core/tx/columnshard/common/reverse_accessor.h>
#include <ydb/core/tx/columnshard/counters/common_data.h>
#include <ydb/core/tx/columnshard/resource_subscriber/container.h>
#include <ydb/core/tx/columnshard/tx_reader/abstract.h>

namespace NKikimr::NColumnShard {
Expand Down Expand Up @@ -252,12 +253,12 @@ class TColumnEngineStats {
class TColumnEngineForLogs;
class IMetadataAccessorResultProcessor {
private:
virtual void DoApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine) = 0;
virtual void DoApplyResult(NResourceBroker::NSubscribe::TResourceContainer<TDataAccessorsResult>&& result, TColumnEngineForLogs& engine) = 0;

public:
virtual ~IMetadataAccessorResultProcessor() = default;

void ApplyResult(TDataAccessorsResult&& result, TColumnEngineForLogs& engine) {
void ApplyResult(NResourceBroker::NSubscribe::TResourceContainer<TDataAccessorsResult>&& result, TColumnEngineForLogs& engine) {
return DoApplyResult(std::move(result), engine);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ class TStatsIterator: public NAbstract::TStatsIterator<NKikimr::NSysView::Schema
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
NActors::TActivationContext::AsActorContext().Send(
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(
std::make_shared<TApplyResult>(result.GetPortions(), std::move(WaitingCountersGuard))));
std::make_shared<TApplyResult>(result.ExtractPortionsVector(), std::move(WaitingCountersGuard))));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,7 @@ std::vector<TCSMetadataRequest> TGranuleActualizationIndex::CollectMetadataReque
if (!TieringActualizer) {
return {};
}
auto req = TieringActualizer->BuildMetadataRequest(PathId, portions, TieringActualizer);
if (!req) {
return {};
}
return { *req };
return TieringActualizer->BuildMetadataRequests(PathId, portions, TieringActualizer);
}

}
Loading

0 comments on commit 34cd26c

Please sign in to comment.