Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

data accessor has to own portion info #11060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(*portion.GetPortionInfo()));
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(portion.GetPortionInfo()));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/data_locks/locks/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ class TListPortionsLock: public ILock {
}
}

TListPortionsLock(const TString& lockName, const std::vector<TPortionInfo::TConstPtr>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Portions.emplace(p->GetAddress());
Granules.emplace(p->GetPathId());
}
}

TListPortionsLock(const TString& lockName, const std::vector<TPortionInfo>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet
const TVersionedIndex& index) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
for (auto&& i : Portions) {
auto schema = i.GetSchema(index);
auto schema = i->GetSchema(index);
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NDataSharing::NEvents {
class TPathIdData {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo>, Portions);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TPtr>, Portions);

TPathIdData() = default;

Expand All @@ -31,7 +31,7 @@ class TPathIdData {
}
PathId = proto.GetPathId();
for (auto&& portionProto : proto.GetPortions()) {
TConclusion<TPortionInfo> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
TConclusion<TPortionInfo::TPtr> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
if (!portion) {
return portion.GetError();
}
Expand All @@ -41,12 +41,12 @@ class TPathIdData {
}

public:
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo>& portions)
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo::TPtr>& portions)
: PathId(pathId)
, Portions(portions) {
}

std::vector<TPortionInfo> DetachPortions() {
std::vector<TPortionInfo::TPtr> DetachPortions() {
return std::move(Portions);
}
THashMap<TTabletId, TTaskForTablet> BuildLinkTabletTasks(const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId,
Expand All @@ -55,9 +55,9 @@ class TPathIdData {
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
AFL_VERIFY(lastPortionId);
for (auto&& i : Portions) {
i.SetPortionId(++*lastPortionId);
i->SetPortionId(++*lastPortionId);
if (pathId) {
i.SetPathId(*pathId);
i->SetPathId(*pathId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
auto it = PathIds.find(i.first);
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.SetPathId(it->second);
index.AppendPortion(std::move(portion));
portion->SetPathId(it->second);
index.AppendPortion(*portion);
}
}
return TConclusionStatus::Success();
Expand Down Expand Up @@ -167,15 +167,15 @@ bool TDestinationSession::DoStart(
THashMap<TString, THashSet<TUnifiedBlobId>> local;
for (auto&& i : portions) {
for (auto&& p : i.second) {
TPortionDataAccessor(*p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
TPortionDataAccessor(p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
}
}
std::swap(CurrentBlobIds, local);
SendCurrentCursorAck(shard, {});
return true;
}

bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion) {
bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo::TConstPtr& portion) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
TPortionDataAccessor(portion).FillBlobIdsByStorage(blobIds, vIndex);
ui32 containsCounter = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class TDestinationSession: public TCommonSession {
}

public:
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo& portion);
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const std::shared_ptr<const TPortionInfo>& portion);

TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) {
auto it = Cursors.find(tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
++p;
} else {
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
i.second.MutablePortions()[p].ResetShardingVersion();
i.second.MutablePortions()[p]->ResetShardingVersion();
i.second.MutablePortions().pop_back();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ void TSourceCursor::BuildSelection(const std::shared_ptr<IStoragesManager>& stor
ui32 chunksCount = 0;
bool selectMore = true;
for (; itCurrentPath != PortionsForSend.end() && selectMore; ++itCurrentPath) {
std::vector<TPortionInfo> portions;
std::vector<TPortionInfo::TPtr> portions;
for (; itPortion != itCurrentPath->second.end(); ++itPortion) {
selectMore = (count < 10000 && chunksCount < 1000000);
if (!selectMore) {
NextPathId = itCurrentPath->first;
NextPortionId = itPortion->first;
} else {
portions.emplace_back(*itPortion->second);
portions.emplace_back(itPortion->second);
chunksCount += TPortionDataAccessor(portions.back()).GetRecords().size();
chunksCount += TPortionDataAccessor(portions.back()).GetIndexes().size();
++count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TSharedBlobsManager;

class TSourceCursor {
private:
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> PortionsForSend;
std::map<ui64, std::map<ui32, TPortionInfo::TPtr>> PortionsForSend;
THashMap<ui64, NEvents::TPathIdData> PreviousSelected;
THashMap<ui64, NEvents::TPathIdData> Selected;
THashMap<TTabletId, TTaskForTablet> Links;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class TColumnEngineChanges {
public:
class IMemoryPredictor {
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) = 0;
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) = 0;
virtual ~IMemoryPredictor() = default;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool TTieringProcessContext::AddPortion(
}
features.OnSkipPortionWithTxLimit(Counters, *dWait);
}
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(*info);
it->second.back().MutableMemoryUsage() = it->second.back().GetMemoryPredictor()->AddPortion(info);
}
it->second.back().MutableTxWriteVolume() += info->GetTxVolume();
if (features.GetTargetTierName() == NTiering::NCommon::DeleteTierName) {
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void TCleanupPortionsColumnEngineChanges::DoDebugString(TStringOutput& out) cons
if (ui32 dropped = PortionsToDrop.size()) {
out << "drop " << dropped << " portions";
for (auto& portionInfo : PortionsToDrop) {
out << portionInfo.DebugString();
out << portionInfo->DebugString();
}
}
}
Expand All @@ -26,7 +26,7 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
for (auto&& p : PortionsToDrop) {
TPortionDataAccessor(p).RemoveFromDatabase(context.DBWrapper);
TPortionDataAccessor(p).FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPathId());
pathIds.emplace(p->GetPathId());
}
for (auto&& i : blobIdsByStorage) {
auto action = BlobsAction.GetRemoving(i.first);
Expand All @@ -38,14 +38,14 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC

void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.DebugString());
if (!context.EngineLogs.ErasePortion(*portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString());
}
}
if (self) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->Counters.GetTabletCounters()->OnDropPortionEvent(p.GetTotalRawBytes(), p.GetTotalBlobBytes(), p.GetRecordsCount());
self->Counters.GetTabletCounters()->OnDropPortionEvent(p->GetTotalRawBytes(), p->GetTotalBlobBytes(), p->GetRecordsCount());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {

}

std::vector<TPortionInfo> PortionsToDrop;
std::vector<TPortionInfo::TConstPtr> PortionsToDrop;

virtual ui32 GetWritePortionsCount() const override {
return 0;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(
dataColumnIds = ISnapshotSchema::GetColumnsWithDifferentDefaults(schemas, resultSchema);
}
for (auto&& i : SwitchedPortions) {
stats->Merge(TPortionDataAccessor(*i).GetSerializationStat(*resultSchema));
stats->Merge(TPortionDataAccessor(i).GetSerializationStat(*resultSchema));
if (i->GetMeta().GetDeletionsCount()) {
dataColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
if (dataColumnIds.size() != resultSchema->GetColumnsCount()) {
for (auto id : TPortionDataAccessor(*i).GetColumnIds()) {
for (auto id : TPortionDataAccessor(i).GetColumnIds()) {
if (resultSchema->HasColumnId(id)) {
dataColumnIds.emplace(id);
}
Expand Down Expand Up @@ -236,8 +236,8 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
return std::make_shared<TMemoryPredictorChunkedPolicy>();
}

ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) {
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes();
ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo::TConstPtr& portionInfo) {
SumMemoryFix += portionInfo->GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo->GetTotalBlobBytes();
++PortionsCount;
SumMemoryDelta = 0;

Expand Down Expand Up @@ -269,7 +269,7 @@ ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPorti
advanceIterator(columnId, maxChunkSize);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)(
"portion_info", portionInfo.DebugString());
"portion_info", portionInfo->DebugString());
return SumMemoryFix + SumMemoryDelta;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
auto predictor = BuildMemoryPredictor();
ui64 result = 0;
for (auto& p : SwitchedPortions) {
result = predictor->AddPortion(*p);
result = predictor->AddPortion(p);
}
return result;
}
Expand Down Expand Up @@ -65,7 +65,7 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
std::list<TColumnInfo> MaxMemoryByColumnChunk;

public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override;
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) override;
};

static std::shared_ptr<IMemoryPredictor> BuildMemoryPredictor();
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
auto predictor = BuildMemoryPredictor();
ui64 result = 0;
for (auto& p : PortionsToEvict) {
result = predictor->AddPortion(*p.GetPortionInfo());
result = predictor->AddPortion(p.GetPortionInfo());
}
return result;
}
Expand All @@ -66,11 +66,11 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
ui64 SumBlobsMemory = 0;
ui64 MaxRawMemory = 0;
public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override {
if (MaxRawMemory < portionInfo.GetTotalRawBytes()) {
MaxRawMemory = portionInfo.GetTotalRawBytes();
virtual ui64 AddPortion(const TPortionInfo::TConstPtr& portionInfo) override {
if (MaxRawMemory < portionInfo->GetTotalRawBytes()) {
MaxRawMemory = portionInfo->GetTotalRawBytes();
}
SumBlobsMemory += portionInfo.GetTotalBlobBytes();
SumBlobsMemory += portionInfo->GetTotalBlobBytes();
return SumBlobsMemory + MaxRawMemory;
}
};
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
};
AppendedPortions.erase(std::remove_if(AppendedPortions.begin(), AppendedPortions.end(), predRemoveDroppedTable), AppendedPortions.end());
for (auto& portionInfoWithBlobs : AppendedPortions) {
auto& portionInfo = portionInfoWithBlobs.GetPortionResult();
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
const auto& portionInfo = portionInfoWithBlobs.GetPortionResultPtr();
AFL_VERIFY(usedPortionIds.emplace(portionInfo->GetPortionId()).second)("portion_info", portionInfo->DebugString(true));
TPortionDataAccessor(portionInfo).SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
}
for (auto&& [_, i] : PortionsToMove) {
Expand Down Expand Up @@ -108,7 +108,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
portion->SetRemoveSnapshot(context.Snapshot);
};
context.EngineLogs.ModifyPortionOnComplete(i, pred);
context.EngineLogs.AddCleanupPortion(*i);
context.EngineLogs.AddCleanupPortion(i);
}
for (auto& portionBuilder : AppendedPortions) {
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult());
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
for (const auto& [_, portionInfo] : spg->GetPortions()) {
UpdatePortionStats(*portionInfo, EStatsUpdateType::ADD);
if (portionInfo->CheckForCleanup()) {
AddCleanupPortion(*portionInfo);
AddCleanupPortion(portionInfo);
}
}
}
Expand Down Expand Up @@ -382,7 +382,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
limitExceeded = true;
break;
}
changes->PortionsToDrop.push_back(*info);
changes->PortionsToDrop.push_back(info);
++portionsFromDrop;
}
}
Expand All @@ -400,9 +400,9 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
++i;
continue;
}
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i].GetTxVolume();
AFL_VERIFY(it->second[i]->CheckForCleanup(snapshot))("p_snapshot", it->second[i]->GetRemoveSnapshotOptional())("snapshot", snapshot);
if (txSize + it->second[i]->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
txSize += it->second[i]->GetTxVolume();
} else {
limitExceeded = true;
break;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ class TColumnEngineForLogs: public IColumnEngine {
return TabletId;
}

void AddCleanupPortion(const TPortionInfo& info) {
AFL_VERIFY(info.HasRemoveSnapshot());
CleanupPortions[info.GetRemoveSnapshotVerified().GetPlanInstant()].emplace_back(info);
void AddCleanupPortion(const TPortionInfo::TConstPtr& info) {
AFL_VERIFY(info->HasRemoveSnapshot());
CleanupPortions[info->GetRemoveSnapshotVerified().GetPlanInstant()].emplace_back(info);
}
void AddShardingInfo(const TGranuleShardingInfo& shardingInfo) {
VersionedIndex.AddShardingInfo(shardingInfo);
Expand All @@ -205,7 +205,7 @@ class TColumnEngineForLogs: public IColumnEngine {
TVersionedIndex VersionedIndex;
ui64 TabletId;
TMap<ui64, std::shared_ptr<TColumnEngineStats>> PathStats; // per path_id stats sorted by path_id
std::map<TInstant, std::vector<TPortionInfo>> CleanupPortions;
std::map<TInstant, std::vector<TPortionInfo::TConstPtr>> CleanupPortions;
TColumnEngineStats Counters;
ui64 LastPortion;
ui64 LastGranule;
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
}
AFL_VERIFY(itRecord == Records.end());
AFL_VERIFY(itBlobIdx == BlobIdxs.end());
} else {
for (auto&& i : Records) {
AFL_VERIFY(i.BlobRange.IsValid());
}
for (auto&& i : Indexes) {
if (auto* blobId = i.GetBlobRangeOptional()) {
AFL_VERIFY(blobId->IsValid());
}
}
}

result.Indexes = std::move(Indexes);
Expand Down
Loading
Loading