Skip to content

Commit

Permalink
Use portion data accessor (ydb-platform#11074)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 4057d49 commit ee80d28
Show file tree
Hide file tree
Showing 51 changed files with 324 additions and 302 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
} else {
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(portion.GetPortionInfo()));
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
}
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
}
granule.InsertPortionOnComplete(portion.GetPortionInfo());
granule.InsertPortionOnComplete(portion.GetPortionInfo().MutablePortionInfoPtr());
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
AFL_VERIFY(CommitSnapshot);
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/columnshard/data_locks/locks/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,17 @@ class TListPortionsLock: public ILock {
return Portions.empty();
}
public:
TListPortionsLock(const TString& lockName, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const bool readOnly = false)
TListPortionsLock(const TString& lockName, const std::vector<TPortionDataAccessor>& portions, const bool readOnly = false)
: TBase(lockName, readOnly)
{
for (auto&& p : portions) {
Portions.emplace(p.GetPortionInfo().GetAddress());
Granules.emplace(p.GetPortionInfo().GetPathId());
}
}

TListPortionsLock(const TString& lockName, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const bool readOnly = false)
: TBase(lockName, readOnly) {
for (auto&& p : portions) {
Portions.emplace(p->GetAddress());
Granules.emplace(p->GetPathId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) {

AFL_VERIFY(!!LockGuard);
const auto& index = shard.GetIndexAs<TColumnEngineForLogs>();
THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>> portionsByPath;
THashMap<ui64, std::vector<TPortionDataAccessor>> portionsByPath;
THashSet<TString> StoragesIds;
for (auto&& i : GetPathIdsForStart()) {
const auto& g = index.GetGranuleVerified(i);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/data_sharing/common/session/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TColumnShard;

namespace NKikimr::NOlap {
class TPortionInfo;
class TPortionDataAccessor;
namespace NDataLocks {
class TManager;
}
Expand Down Expand Up @@ -42,7 +43,7 @@ class TCommonSession {
EState State = EState::Created;
protected:
TTransferContext TransferContext;
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) = 0;
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) = 0;
virtual THashSet<ui64> GetPathIdsForStart() const = 0;
public:
virtual ~TCommonSession() = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

namespace NKikimr::NOlap::NDataSharing::NEvents {

THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
THashMap<TTabletId, TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context,
const TVersionedIndex& index) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
for (auto&& i : Portions) {
auto schema = i->GetSchema(index);
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
auto schema = i.GetPortionInfo().GetSchema(index);
i.FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
}

const std::shared_ptr<TSharedBlobsManager> sharedBlobs = storages->GetSharedBlobsManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NDataSharing::NEvents {
class TPathIdData {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TPtr>, Portions);
YDB_ACCESSOR_DEF(std::vector<TPortionDataAccessor>, Portions);

TPathIdData() = default;

Expand All @@ -31,7 +31,7 @@ class TPathIdData {
}
PathId = proto.GetPathId();
for (auto&& portionProto : proto.GetPortions()) {
TConclusion<TPortionInfo::TPtr> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
TConclusion<TPortionDataAccessor> portion = TPortionDataAccessor::BuildFromProto(portionProto, indexInfo);
if (!portion) {
return portion.GetError();
}
Expand All @@ -41,12 +41,12 @@ class TPathIdData {
}

public:
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo::TPtr>& portions)
TPathIdData(const ui64 pathId, const std::vector<TPortionDataAccessor>& portions)
: PathId(pathId)
, Portions(portions) {
}

std::vector<TPortionInfo::TPtr> DetachPortions() {
std::vector<TPortionDataAccessor> DetachPortions() {
return std::move(Portions);
}
THashMap<TTabletId, TTaskForTablet> BuildLinkTabletTasks(const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId,
Expand All @@ -55,17 +55,17 @@ class TPathIdData {
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
AFL_VERIFY(lastPortionId);
for (auto&& i : Portions) {
i->SetPortionId(++*lastPortionId);
i.MutablePortionInfo().SetPortionId(++*lastPortionId);
if (pathId) {
i->SetPathId(*pathId);
i.MutablePortionInfo().SetPathId(*pathId);
}
}
}

void SerializeToProto(NKikimrColumnShardDataSharingProto::TPathIdData& proto) const {
proto.SetPathId(PathId);
for (auto&& i : Portions) {
TPortionDataAccessor(i).SerializeToProto(*proto.AddPortions());
i.SerializeToProto(*proto.AddPortions());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
auto it = PathIds.find(i.first);
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion->SetPathId(it->second);
index.AppendPortion(*portion);
portion.MutablePortionInfo().SetPathId(it->second);
index.AppendPortion(portion.GetPortionInfo());
}
}
return TConclusionStatus::Success();
Expand Down Expand Up @@ -161,23 +161,23 @@ NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto(
}

bool TDestinationSession::DoStart(
const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) {
const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) {
AFL_VERIFY(IsConfirmed());
NYDBTest::TControllers::GetColumnShardController()->OnDataSharingStarted(shard.TabletID(), GetSessionId());
THashMap<TString, THashSet<TUnifiedBlobId>> local;
for (auto&& i : portions) {
for (auto&& p : i.second) {
TPortionDataAccessor(p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
p.FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
}
}
std::swap(CurrentBlobIds, local);
SendCurrentCursorAck(shard, {});
return true;
}

bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo::TConstPtr& portion) {
bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionDataAccessor& portion) {
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
TPortionDataAccessor(portion).FillBlobIdsByStorage(blobIds, vIndex);
portion.FillBlobIdsByStorage(blobIds, vIndex);
ui32 containsCounter = 0;
ui32 newCounter = 0;
for (auto&& i : blobIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TDestinationSession: public TCommonSession {
THashMap<TString, THashSet<TUnifiedBlobId>> CurrentBlobIds;

protected:
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) override;
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) override;
virtual THashSet<ui64> GetPathIdsForStart() const override {
THashSet<ui64> result;
for (auto&& i : PathIds) {
Expand All @@ -88,7 +88,7 @@ class TDestinationSession: public TCommonSession {
}

public:
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const std::shared_ptr<const TPortionInfo>& portion);
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionDataAccessor& portion);

TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) {
auto it = Cursors.find(tabletId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
THashMap<TString, THashSet<NBlobCache::TUnifiedBlobId>> sharedBlobIds;
for (auto&& i : PortionsByPathId) {
for (auto&& p : i.second.GetPortions()) {
TPortionDataAccessor(p).SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
p.SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
}
}
NIceDb::TNiceDb db(txc.DB);
Expand All @@ -47,7 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
++p;
} else {
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
i.second.MutablePortions()[p]->ResetShardingVersion();
i.second.MutablePortions()[p].MutablePortionInfo().ResetShardingVersion();
i.second.MutablePortions().pop_back();
}
}
Expand Down
15 changes: 7 additions & 8 deletions ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ void TSourceCursor::BuildSelection(const std::shared_ptr<IStoragesManager>& stor
ui32 chunksCount = 0;
bool selectMore = true;
for (; itCurrentPath != PortionsForSend.end() && selectMore; ++itCurrentPath) {
std::vector<TPortionInfo::TPtr> portions;
std::vector<TPortionDataAccessor> portions;
for (; itPortion != itCurrentPath->second.end(); ++itPortion) {
selectMore = (count < 10000 && chunksCount < 1000000);
if (!selectMore) {
NextPathId = itCurrentPath->first;
NextPortionId = itPortion->first;
} else {
portions.emplace_back(itPortion->second);
chunksCount += TPortionDataAccessor(portions.back()).GetRecords().size();
chunksCount += TPortionDataAccessor(portions.back()).GetIndexes().size();
chunksCount += portions.back().GetRecords().size();
chunksCount += portions.back().GetIndexes().size();
++count;
}
}
Expand Down Expand Up @@ -158,16 +158,15 @@ void TSourceCursor::SaveToDatabase(NIceDb::TNiceDb& db, const TString& sessionId
}

bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager,
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions, const TVersionedIndex& index) {
AFL_VERIFY(!IsStartedFlag);
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> local;
std::vector<std::shared_ptr<TPortionInfo>> portionsLock;
std::map<ui64, std::map<ui32, TPortionDataAccessor>> local;
NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(0);
for (auto&& i : portions) {
hashCalcer.Start();
std::map<ui32, std::shared_ptr<TPortionInfo>> portionsMap;
std::map<ui32, TPortionDataAccessor> portionsMap;
for (auto&& p : i.second) {
const ui64 portionId = p->GetPortionId();
const ui64 portionId = p.GetPortionInfo().GetPortionId();
hashCalcer.Update((ui8*)&portionId, sizeof(portionId));
AFL_VERIFY(portionsMap.emplace(portionId, p).second);
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/data_sharing/source/session/cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TSharedBlobsManager;

class TSourceCursor {
private:
std::map<ui64, std::map<ui32, TPortionInfo::TPtr>> PortionsForSend;
std::map<ui64, std::map<ui32, TPortionDataAccessor>> PortionsForSend;
THashMap<ui64, NEvents::TPathIdData> PreviousSelected;
THashMap<ui64, NEvents::TPathIdData> Selected;
THashMap<TTabletId, TTaskForTablet> Links;
Expand Down Expand Up @@ -105,8 +105,8 @@ class TSourceCursor {

void SaveToDatabase(class NIceDb::TNiceDb& db, const TString& sessionId);

bool Start(const std::shared_ptr<IStoragesManager>& storagesManager,
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index);
bool Start(const std::shared_ptr<IStoragesManager>& storagesManager, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions,
const TVersionedIndex& index);
[[nodiscard]] TConclusionStatus DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic& proto,
const NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic& protoStatic);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void TSourceSession::ActualizeDestination(const NColumnShard::TColumnShard& shar
}
}

bool TSourceSession::DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) {
bool TSourceSession::DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) {
AFL_VERIFY(Cursor);
if (Cursor->Start(shard.GetStoragesManager(), portions, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex())) {
ActualizeDestination(shard, shard.GetDataLocksManager());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class TSourceSession: public TCommonSession {
YDB_READONLY_DEF(std::set<ui64>, PathIds);
TTabletId DestinationTabletId = TTabletId(0);
protected:
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) override;
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) override;
virtual THashSet<ui64> GetPathIdsForStart() const override {
THashSet<ui64> result;
for (auto&& i : PathIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc
auto& index = Self->GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
for (auto&& [_, i] : Session->GetCursorVerified()->GetPreviousSelected()) {
for (auto&& portion : i.GetPortions()) {
TPortionDataAccessor(portion).FillBlobIdsByStorage(sharedBlobIds, index);
portion.FillBlobIdsByStorage(sharedBlobIds, index);
}
}
for (auto&& i : sharedBlobIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ bool TTieringProcessContext::AddPortion(
} else {
Counters.OnPortionToEvict(info->GetTotalBlobBytes(), *dWait);
}
it->second.back().GetTask()->AddPortionToEvict(info, std::move(features));
it->second.back().GetTask()->AddPortionToEvict(TPortionDataAccessor(info), std::move(features));
AFL_VERIFY(!it->second.back().GetTask()->HasPortionsToRemove())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString());
}
return true;
Expand Down
15 changes: 8 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void TCleanupPortionsColumnEngineChanges::DoDebugString(TStringOutput& out) cons
if (ui32 dropped = PortionsToDrop.size()) {
out << "drop " << dropped << " portions";
for (auto& portionInfo : PortionsToDrop) {
out << portionInfo->DebugString();
out << portionInfo.GetPortionInfo().DebugString();
}
}
}
Expand All @@ -24,9 +24,9 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
}
THashMap<TString, THashSet<TUnifiedBlobId>> blobIdsByStorage;
for (auto&& p : PortionsToDrop) {
TPortionDataAccessor(p).RemoveFromDatabase(context.DBWrapper);
TPortionDataAccessor(p).FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p->GetPathId());
p.RemoveFromDatabase(context.DBWrapper);
p.FillBlobIdsByStorage(blobIdsByStorage, context.EngineLogs.GetVersionedIndex());
pathIds.emplace(p.GetPortionInfo().GetPathId());
}
for (auto&& i : blobIdsByStorage) {
auto action = BlobsAction.GetRemoving(i.first);
Expand All @@ -38,14 +38,15 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC

void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(*portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString());
if (!context.EngineLogs.ErasePortion(portionInfo.GetPortionInfo())) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo.GetPortionInfo().DebugString());
}
}
if (self) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->Counters.GetTabletCounters()->OnDropPortionEvent(p->GetTotalRawBytes(), p->GetTotalBlobBytes(), p->GetRecordsCount());
self->Counters.GetTabletCounters()->OnDropPortionEvent(
p.GetPortionInfo().GetTotalRawBytes(), p.GetPortionInfo().GetTotalBlobBytes(), p.GetPortionInfo().GetRecordsCount());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {

}

std::vector<TPortionInfo::TConstPtr> PortionsToDrop;
std::vector<TPortionDataAccessor> PortionsToDrop;

virtual ui32 GetWritePortionsCount() const override {
return 0;
Expand Down
Loading

0 comments on commit ee80d28

Please sign in to comment.