Skip to content

Commit

Permalink
remove portions constructor hard freeing from tablet
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Jul 8, 2024
1 parent 1f88b11 commit 38e9f7b
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TColumnEngineChanges {
void Start(NColumnShard::TColumnShard& self);

virtual ui32 GetWritePortionsCount() const = 0;
virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0;
virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 index) = 0;
virtual bool NeedWritePortion(const ui32 index) const = 0;

void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges {
virtual ui32 GetWritePortionsCount() const override {
return 0;
}
virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 /*index*/) override {
return nullptr;
}
virtual bool NeedWritePortion(const ui32 /*index*/) const override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/cleanup_tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges {
virtual ui32 GetWritePortionsCount() const override {
return 0;
}
virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override {
virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 /*index*/) override {
return nullptr;
}
virtual bool NeedWritePortion(const ui32 /*index*/) const override {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,17 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc
TGeneralSerializedSlice slice(std::move(i));
auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true);
AppendedPortions.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(),
resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager()));
AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo());
auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(),
resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager());
constructor.FillStatistics(resultSchema->GetIndexInfo());
NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey()));
NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot());
AppendedPortions.back().GetPortionConstructor().AddMetadata(*resultSchema, deletionsCount, primaryKeys, snapshotKeys);
AppendedPortions.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
constructor.GetPortionConstructor().AddMetadata(*resultSchema, deletionsCount, primaryKeys, snapshotKeys);
constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
if (shardingActual) {
AppendedPortions.back().GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion());
constructor.GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion());
}
AppendedPortions.emplace_back(std::move(constructor));
recordIdx += slice.GetRecordsCount();
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
if (pathInfo.GetShardingInfo()) {
portion.GetPortionConstructor().SetShardingVersion(pathInfo.GetShardingInfo()->GetSnapshotVersion());
}
AppendedPortions.emplace_back(std::move(portion));
AppendedPortions.emplace_back(TWritePortionInfoWithBlobsResult(std::move(portion)));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/ttl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan
}
}

std::optional<TWritePortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
std::optional<TWritePortionInfoWithBlobsResult> TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
TConstructionContext& context) const
{
const TPortionInfo& portionInfo = info.GetPortionInfo();
Expand All @@ -55,7 +55,7 @@ std::optional<TWritePortionInfoWithBlobs> TTTLColumnEngineChanges::UpdateEvicted
Y_ABORT_UNLESS(portionInfo.GetMeta().GetTierName() != evictFeatures.GetTargetTierName() || blobSchema->GetVersion() < evictFeatures.GetTargetScheme()->GetVersion());

auto portionWithBlobs = TReadPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo());
std::optional<TWritePortionInfoWithBlobs> result = TReadPortionInfoWithBlobs::SyncPortion(
std::optional<TWritePortionInfoWithBlobsResult> result = TReadPortionInfoWithBlobs::SyncPortion(
std::move(portionWithBlobs), blobSchema, evictFeatures.GetTargetScheme(), evictFeatures.GetTargetTierName(), SaverContext.GetStoragesManager(), context.Counters.SplitterCounters);
return std::move(result);
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/ttl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TTTLColumnEngineChanges: public TChangesWithAppend {
}
};

std::optional<TWritePortionInfoWithBlobs> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
std::optional<TWritePortionInfoWithBlobsResult> UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs,
TConstructionContext& context) const;

std::vector<TPortionForEviction> PortionsToEvict;
Expand Down
15 changes: 8 additions & 7 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true));
portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
}
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) {
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
auto& portionInfo = item.GetPortionResult();
if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId());
Expand Down Expand Up @@ -102,7 +102,7 @@ void TChangesWithAppend::DoOnAfterCompile() {
}
}

std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
std::vector<TWritePortionInfoWithBlobsConstructor> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch,
const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const {
Y_ABORT_UNLESS(batch->num_rows());

Expand All @@ -116,7 +116,7 @@ std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions
if (overrideSaver) {
schema->SetOverrideSerializer(*overrideSaver);
}
std::vector<TWritePortionInfoWithBlobs> out;
std::vector<TWritePortionInfoWithBlobsConstructor> out;
{
std::vector<TBatchSerializedSlice> pages = TBatchSerializedSlice::BuildSimpleSlices(batch, NSplitter::TSplitSettings(), context.Counters.SplitterCounters, schema);
std::vector<TGeneralSerializedSlice> generalPages;
Expand All @@ -134,10 +134,11 @@ std::vector<TWritePortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions
for (auto&& i : packs) {
TGeneralSerializedSlice slice(std::move(i));
auto b = batch->Slice(recordIdx, slice.GetRecordsCount());
out.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager()));
out.back().FillStatistics(resultSchema->GetIndexInfo());
out.back().GetPortionConstructor().AddMetadata(*resultSchema, b);
out.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager());
constructor.FillStatistics(resultSchema->GetIndexInfo());
constructor.GetPortionConstructor().AddMetadata(*resultSchema, b);
constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId);
out.emplace_back(std::move(constructor));
recordIdx += slice.GetRecordsCount();
}
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/changes/with_appended.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TChangesWithAppend: public TColumnEngineChanges {
virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override;
virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override;
virtual void DoStart(NColumnShard::TColumnShard& self) override;
std::vector<TWritePortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
std::vector<TWritePortionInfoWithBlobsConstructor> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional<NArrow::NSerialization::TSerializerContainer>& overrideSaver) const;

virtual void DoDebugString(TStringOutput& out) const override {
Expand Down Expand Up @@ -61,11 +61,11 @@ class TChangesWithAppend: public TColumnEngineChanges {
AFL_VERIFY(PortionsToRemove.emplace(info.GetAddress(), info).second);
}

std::vector<TWritePortionInfoWithBlobs> AppendedPortions;
std::vector<TWritePortionInfoWithBlobsResult> AppendedPortions;
virtual ui32 GetWritePortionsCount() const override {
return AppendedPortions.size();
}
virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override {
virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 index) override {
Y_ABORT_UNLESS(index < AppendedPortions.size());
return &AppendedPortions[index];
}
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ bool TReadPortionInfoWithBlobs::ExtractColumnChunks(const ui32 entityId, std::ve
return true;
}

std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source,
std::optional<TWritePortionInfoWithBlobsResult> TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source,
const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
std::shared_ptr<NColumnShard::TSplitterCounters> counters) {
if (from->GetVersion() == to->GetVersion() && targetTier == source.GetPortionInfo().GetTierNameDef(IStoragesManager::DefaultStorageId)) {
Expand Down Expand Up @@ -163,8 +163,7 @@ std::optional<TWritePortionInfoWithBlobs> TReadPortionInfoWithBlobs::SyncPortion
}
constructor.MutableMeta().ResetStatisticsStorage(std::move(storage));

TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages);
return result;
return TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages);
}

const TString& TReadPortionInfoWithBlobs::GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
namespace NKikimr::NOlap {

class TVersionedIndex;
class TWritePortionInfoWithBlobs;
class TWritePortionInfoWithBlobsResult;

class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
private:
Expand Down Expand Up @@ -40,7 +40,7 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
const TIndexInfo& indexInfo);

std::shared_ptr<arrow::RecordBatch> GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set<std::string>& columnNames = {}) const;
static std::optional<TWritePortionInfoWithBlobs> SyncPortion(TReadPortionInfoWithBlobs&& source,
static std::optional<TWritePortionInfoWithBlobsResult> SyncPortion(TReadPortionInfoWithBlobs&& source,
const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
std::shared_ptr<NColumnShard::TSplitterCounters> counters);

Expand Down
37 changes: 28 additions & 9 deletions ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

namespace NKikimr::NOlap {

void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
void TWritePortionInfoWithBlobsConstructor::TBlobInfo::AddChunk(TWritePortionInfoWithBlobsConstructor& owner, const std::shared_ptr<IPortionDataChunk>& chunk) {
AFL_VERIFY(chunk);
Y_ABORT_UNLESS(!ResultBlob);
Y_ABORT_UNLESS(!Finished);
const TString& data = chunk->GetData();

TBlobRangeLink16 bRange(Size, data.size());
Expand All @@ -17,14 +17,14 @@ void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs&
chunk->AddIntoPortionBeforeBlob(bRange, owner.GetPortionConstructor());
}

void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) {
void TWritePortionInfoWithBlobsResult::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobsResult& owner, const TUnifiedBlobId& blobId) const {
const TBlobRangeLink16::TLinkId idx = owner.GetPortionConstructor().RegisterBlobId(blobId);
for (auto&& i : Chunks) {
owner.GetPortionConstructor().RegisterBlobIdx(i.first, idx);
owner.GetPortionConstructor().RegisterBlobIdx(i, idx);
}
}

TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector<TSplittedBlob>&& chunks,
const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr<IStoragesManager>& operators)
{
TPortionInfoConstructor constructor(granule);
Expand All @@ -33,8 +33,8 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<
return BuildByBlobs(std::move(chunks), std::move(constructor), operators);
}

TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators) {
TWritePortionInfoWithBlobs result(std::move(constructor));
TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector<TSplittedBlob>&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr<IStoragesManager>& operators) {
TWritePortionInfoWithBlobsConstructor result(std::move(constructor));
for (auto&& blob : chunks) {
auto storage = operators->GetOperatorVerified(blob.GetGroupName());
auto blobInfo = result.StartBlob(storage);
Expand All @@ -45,7 +45,7 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector<
return result;
}

std::vector<std::shared_ptr<IPortionDataChunk>> TWritePortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const {
std::vector<std::shared_ptr<IPortionDataChunk>> TWritePortionInfoWithBlobsConstructor::GetEntityChunks(const ui32 entityId) const {
std::map<TChunkAddress, std::shared_ptr<IPortionDataChunk>> sortedChunks;
for (auto&& b : GetBlobs()) {
for (auto&& i : b.GetChunks()) {
Expand All @@ -62,7 +62,7 @@ std::vector<std::shared_ptr<IPortionDataChunk>> TWritePortionInfoWithBlobs::GetE
return result;
}

void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) {
void TWritePortionInfoWithBlobsConstructor::FillStatistics(const TIndexInfo& index) {
NStatistics::TPortionStorage storage;
for (auto&& i : index.GetStatisticsByName()) {
THashMap<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> data;
Expand All @@ -74,4 +74,23 @@ void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) {
GetPortionConstructor().MutableMeta().SetStatisticsStorage(std::move(storage));
}

TString TWritePortionInfoWithBlobsResult::GetBlobByRangeVerified(const ui32 entityId, const ui32 chunkIdx) const {
AFL_VERIFY(!!PortionConstructor);
for (auto&& rec : PortionConstructor->GetRecords()) {
if (rec.GetEntityId() != entityId || rec.GetChunkIdx() != chunkIdx) {
continue;
}
for (auto&& i : Blobs) {
for (auto&& c : i.GetChunks()) {
if (c == TChunkAddress(entityId, chunkIdx)) {
return i.GetResultBlob().substr(rec.BlobRange.Offset, rec.BlobRange.Size);
}
}
}
AFL_VERIFY(false);
}
AFL_VERIFY(false);
return "";
}

}
Loading

0 comments on commit 38e9f7b

Please sign in to comment.