diff --git a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h index 7234e4710f53..721270ea63f3 100644 --- a/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h +++ b/ydb/core/tx/columnshard/engines/changes/abstract/abstract.h @@ -275,7 +275,7 @@ class TColumnEngineChanges { void Start(NColumnShard::TColumnShard& self); virtual ui32 GetWritePortionsCount() const = 0; - virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) = 0; + virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 index) = 0; virtual bool NeedWritePortion(const ui32 index) const = 0; void WriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context); diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h index 71a48e78be6e..a77d172be9e9 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_portions.h @@ -42,7 +42,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges { virtual ui32 GetWritePortionsCount() const override { return 0; } - virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { + virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 /*index*/) override { return nullptr; } virtual bool NeedWritePortion(const ui32 /*index*/) const override { diff --git a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h index f39d33f5871c..33c7fe34cb1d 100644 --- a/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h +++ b/ydb/core/tx/columnshard/engines/changes/cleanup_tables.h @@ -40,7 +40,7 @@ class TCleanupTablesColumnEngineChanges: public TColumnEngineChanges { virtual ui32 GetWritePortionsCount() const override { return 0; } - virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 /*index*/) override { + virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 /*index*/) override { return nullptr; } virtual bool NeedWritePortion(const ui32 /*index*/) const override { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 87b66aa9473e..8ba3d0891d85 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -270,16 +270,17 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(TConstruc TGeneralSerializedSlice slice(std::move(i)); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true); - AppendedPortions.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(), - resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager())); - AppendedPortions.back().FillStatistics(resultSchema->GetIndexInfo()); + auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), GranuleMeta->GetPathId(), + resultSchema->GetVersion(), resultSchema->GetSnapshot(), SaverContext.GetStoragesManager()); + constructor.FillStatistics(resultSchema->GetIndexInfo()); NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); - AppendedPortions.back().GetPortionConstructor().AddMetadata(*resultSchema, deletionsCount, primaryKeys, snapshotKeys); - AppendedPortions.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); + constructor.GetPortionConstructor().AddMetadata(*resultSchema, deletionsCount, primaryKeys, snapshotKeys); + constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); if (shardingActual) { - AppendedPortions.back().GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion()); + constructor.GetPortionConstructor().SetShardingVersion(shardingActual->GetSnapshotVersion()); } + AppendedPortions.emplace_back(std::move(constructor)); recordIdx += slice.GetRecordsCount(); } } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index e34fa411a347..6207a8883376 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -213,7 +213,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont if (pathInfo.GetShardingInfo()) { portion.GetPortionConstructor().SetShardingVersion(pathInfo.GetShardingInfo()->GetSnapshotVersion()); } - AppendedPortions.emplace_back(std::move(portion)); + AppendedPortions.emplace_back(TWritePortionInfoWithBlobsResult(std::move(portion))); } } } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index f23c799e3319..fc74dbea0454 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -46,7 +46,7 @@ void TTTLColumnEngineChanges::DoOnFinish(NColumnShard::TColumnShard& self, TChan } } -std::optional TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, +std::optional TTTLColumnEngineChanges::UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, TConstructionContext& context) const { const TPortionInfo& portionInfo = info.GetPortionInfo(); @@ -55,7 +55,7 @@ std::optional TTTLColumnEngineChanges::UpdateEvicted Y_ABORT_UNLESS(portionInfo.GetMeta().GetTierName() != evictFeatures.GetTargetTierName() || blobSchema->GetVersion() < evictFeatures.GetTargetScheme()->GetVersion()); auto portionWithBlobs = TReadPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs, blobSchema->GetIndexInfo()); - std::optional result = TReadPortionInfoWithBlobs::SyncPortion( + std::optional result = TReadPortionInfoWithBlobs::SyncPortion( std::move(portionWithBlobs), blobSchema, evictFeatures.GetTargetScheme(), evictFeatures.GetTargetTierName(), SaverContext.GetStoragesManager(), context.Counters.SplitterCounters); return std::move(result); } diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.h b/ydb/core/tx/columnshard/engines/changes/ttl.h index 92eb0ffa9b3c..b75795e16fe4 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.h +++ b/ydb/core/tx/columnshard/engines/changes/ttl.h @@ -40,7 +40,7 @@ class TTTLColumnEngineChanges: public TChangesWithAppend { } }; - std::optional UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, + std::optional UpdateEvictedPortion(TPortionForEviction& info, NBlobOperations::NRead::TCompositeReadBlobs& srcBlobs, TConstructionContext& context) const; std::vector PortionsToEvict; diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index edc8f8ca86de..090aeca0c15d 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -17,7 +17,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, AFL_VERIFY(usedPortionIds.emplace(portionInfo.GetPortionId()).second)("portion_info", portionInfo.DebugString(true)); portionInfo.SaveToDatabase(context.DBWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false); } - const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobs& item) { + const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) { auto& portionInfo = item.GetPortionResult(); if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId()); @@ -102,7 +102,7 @@ void TChangesWithAppend::DoOnAfterCompile() { } } -std::vector TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr batch, +std::vector TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr batch, const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional& overrideSaver) const { Y_ABORT_UNLESS(batch->num_rows()); @@ -116,7 +116,7 @@ std::vector TChangesWithAppend::MakeAppendedPortions if (overrideSaver) { schema->SetOverrideSerializer(*overrideSaver); } - std::vector out; + std::vector out; { std::vector pages = TBatchSerializedSlice::BuildSimpleSlices(batch, NSplitter::TSplitSettings(), context.Counters.SplitterCounters, schema); std::vector generalPages; @@ -134,10 +134,11 @@ std::vector TChangesWithAppend::MakeAppendedPortions for (auto&& i : packs) { TGeneralSerializedSlice slice(std::move(i)); auto b = batch->Slice(recordIdx, slice.GetRecordsCount()); - out.emplace_back(TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager())); - out.back().FillStatistics(resultSchema->GetIndexInfo()); - out.back().GetPortionConstructor().AddMetadata(*resultSchema, b); - out.back().GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); + auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), pathId, resultSchema->GetVersion(), snapshot, SaverContext.GetStoragesManager()); + constructor.FillStatistics(resultSchema->GetIndexInfo()); + constructor.GetPortionConstructor().AddMetadata(*resultSchema, b); + constructor.GetPortionConstructor().MutableMeta().SetTierName(IStoragesManager::DefaultStorageId); + out.emplace_back(std::move(constructor)); recordIdx += slice.GetRecordsCount(); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h index 4c5fbc2189c6..59fa8227dbab 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.h +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h @@ -17,7 +17,7 @@ class TChangesWithAppend: public TColumnEngineChanges { virtual void DoWriteIndexOnExecute(NColumnShard::TColumnShard* self, TWriteIndexContext& context) override; virtual void DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) override; virtual void DoStart(NColumnShard::TColumnShard& self) override; - std::vector MakeAppendedPortions(const std::shared_ptr batch, const ui64 granule, + std::vector MakeAppendedPortions(const std::shared_ptr batch, const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context, const std::optional& overrideSaver) const; virtual void DoDebugString(TStringOutput& out) const override { @@ -61,11 +61,11 @@ class TChangesWithAppend: public TColumnEngineChanges { AFL_VERIFY(PortionsToRemove.emplace(info.GetAddress(), info).second); } - std::vector AppendedPortions; + std::vector AppendedPortions; virtual ui32 GetWritePortionsCount() const override { return AppendedPortions.size(); } - virtual TWritePortionInfoWithBlobs* GetWritePortionInfo(const ui32 index) override { + virtual TWritePortionInfoWithBlobsResult* GetWritePortionInfo(const ui32 index) override { Y_ABORT_UNLESS(index < AppendedPortions.size()); return &AppendedPortions[index]; } diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp index ca5a1505d620..4a13b658a724 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp @@ -105,7 +105,7 @@ bool TReadPortionInfoWithBlobs::ExtractColumnChunks(const ui32 entityId, std::ve return true; } -std::optional TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source, +std::optional TReadPortionInfoWithBlobs::SyncPortion(TReadPortionInfoWithBlobs&& source, const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr& storages, std::shared_ptr counters) { if (from->GetVersion() == to->GetVersion() && targetTier == source.GetPortionInfo().GetTierNameDef(IStoragesManager::DefaultStorageId)) { @@ -163,8 +163,7 @@ std::optional TReadPortionInfoWithBlobs::SyncPortion } constructor.MutableMeta().ResetStatisticsStorage(std::move(storage)); - TWritePortionInfoWithBlobs result = TWritePortionInfoWithBlobs::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages); - return result; + return TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), std::move(constructor), storages); } const TString& TReadPortionInfoWithBlobs::GetBlobByAddressVerified(const ui32 columnId, const ui32 chunkId) const { diff --git a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h index 6d688db66074..42fcd8a52f8a 100644 --- a/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/read_with_blobs.h @@ -12,7 +12,7 @@ namespace NKikimr::NOlap { class TVersionedIndex; -class TWritePortionInfoWithBlobs; +class TWritePortionInfoWithBlobsResult; class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { private: @@ -40,7 +40,7 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs { const TIndexInfo& indexInfo); std::shared_ptr GetBatch(const ISnapshotSchema::TPtr& data, const ISnapshotSchema& result, const std::set& columnNames = {}) const; - static std::optional SyncPortion(TReadPortionInfoWithBlobs&& source, + static std::optional SyncPortion(TReadPortionInfoWithBlobs&& source, const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr& storages, std::shared_ptr counters); diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp index 4fbc7cb3305c..9af4a7c74092 100644 --- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.cpp @@ -3,9 +3,9 @@ namespace NKikimr::NOlap { -void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr& chunk) { +void TWritePortionInfoWithBlobsConstructor::TBlobInfo::AddChunk(TWritePortionInfoWithBlobsConstructor& owner, const std::shared_ptr& chunk) { AFL_VERIFY(chunk); - Y_ABORT_UNLESS(!ResultBlob); + Y_ABORT_UNLESS(!Finished); const TString& data = chunk->GetData(); TBlobRangeLink16 bRange(Size, data.size()); @@ -17,14 +17,14 @@ void TWritePortionInfoWithBlobs::TBlobInfo::AddChunk(TWritePortionInfoWithBlobs& chunk->AddIntoPortionBeforeBlob(bRange, owner.GetPortionConstructor()); } -void TWritePortionInfoWithBlobs::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId) { +void TWritePortionInfoWithBlobsResult::TBlobInfo::RegisterBlobId(TWritePortionInfoWithBlobsResult& owner, const TUnifiedBlobId& blobId) const { const TBlobRangeLink16::TLinkId idx = owner.GetPortionConstructor().RegisterBlobId(blobId); for (auto&& i : Chunks) { - owner.GetPortionConstructor().RegisterBlobIdx(i.first, idx); + owner.GetPortionConstructor().RegisterBlobIdx(i, idx); } } -TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector&& chunks, +TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector&& chunks, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr& operators) { TPortionInfoConstructor constructor(granule); @@ -33,8 +33,8 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector< return BuildByBlobs(std::move(chunks), std::move(constructor), operators); } -TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr& operators) { - TWritePortionInfoWithBlobs result(std::move(constructor)); +TWritePortionInfoWithBlobsConstructor TWritePortionInfoWithBlobsConstructor::BuildByBlobs(std::vector&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr& operators) { + TWritePortionInfoWithBlobsConstructor result(std::move(constructor)); for (auto&& blob : chunks) { auto storage = operators->GetOperatorVerified(blob.GetGroupName()); auto blobInfo = result.StartBlob(storage); @@ -45,7 +45,7 @@ TWritePortionInfoWithBlobs TWritePortionInfoWithBlobs::BuildByBlobs(std::vector< return result; } -std::vector> TWritePortionInfoWithBlobs::GetEntityChunks(const ui32 entityId) const { +std::vector> TWritePortionInfoWithBlobsConstructor::GetEntityChunks(const ui32 entityId) const { std::map> sortedChunks; for (auto&& b : GetBlobs()) { for (auto&& i : b.GetChunks()) { @@ -62,7 +62,7 @@ std::vector> TWritePortionInfoWithBlobs::GetE return result; } -void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) { +void TWritePortionInfoWithBlobsConstructor::FillStatistics(const TIndexInfo& index) { NStatistics::TPortionStorage storage; for (auto&& i : index.GetStatisticsByName()) { THashMap>> data; @@ -74,4 +74,23 @@ void TWritePortionInfoWithBlobs::FillStatistics(const TIndexInfo& index) { GetPortionConstructor().MutableMeta().SetStatisticsStorage(std::move(storage)); } +TString TWritePortionInfoWithBlobsResult::GetBlobByRangeVerified(const ui32 entityId, const ui32 chunkIdx) const { + AFL_VERIFY(!!PortionConstructor); + for (auto&& rec : PortionConstructor->GetRecords()) { + if (rec.GetEntityId() != entityId || rec.GetChunkIdx() != chunkIdx) { + continue; + } + for (auto&& i : Blobs) { + for (auto&& c : i.GetChunks()) { + if (c == TChunkAddress(entityId, chunkIdx)) { + return i.GetResultBlob().substr(rec.BlobRange.Offset, rec.BlobRange.Size); + } + } + } + AFL_VERIFY(false); + } + AFL_VERIFY(false); + return ""; +} + } diff --git a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h index 39c3bb885fe4..bb93cc3ae7d5 100644 --- a/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h +++ b/ydb/core/tx/columnshard/engines/portions/write_with_blobs.h @@ -9,7 +9,9 @@ namespace NKikimr::NOlap { -class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { +class TWritePortionInfoWithBlobsResult; + +class TWritePortionInfoWithBlobsConstructor: public TBasePortionInfoWithBlobs { public: class TBlobInfo { private: @@ -18,9 +20,8 @@ class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { YDB_READONLY_DEF(TBlobChunks, Chunks); YDB_READONLY_DEF(std::shared_ptr, Operator); std::vector> ChunksOrdered; - mutable std::optional ResultBlob; - void AddChunk(TWritePortionInfoWithBlobs& owner, const std::shared_ptr& chunk); - + bool Finished = false; + void AddChunk(TWritePortionInfoWithBlobsConstructor& owner, const std::shared_ptr& chunk); public: TBlobInfo(const std::shared_ptr& bOperator) : Operator(bOperator) @@ -31,9 +32,9 @@ class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { class TBuilder { private: TBlobInfo* OwnerBlob; - TWritePortionInfoWithBlobs* OwnerPortion; + TWritePortionInfoWithBlobsConstructor* OwnerPortion; public: - TBuilder(TBlobInfo& blob, TWritePortionInfoWithBlobs& portion) + TBuilder(TBlobInfo& blob, TWritePortionInfoWithBlobsConstructor& portion) : OwnerBlob(&blob) , OwnerPortion(&portion) { } @@ -46,26 +47,32 @@ class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { } }; - const TString& GetBlob() const { - if (!ResultBlob) { - TString result; - result.reserve(Size); - for (auto&& i : ChunksOrdered) { - result.append(i->GetData()); - } - ResultBlob = std::move(result); + std::vector ExtractChunks() { + std::vector result; + result.reserve(Chunks.size()); + for (auto&& i : Chunks) { + result.emplace_back(i.first); } - return *ResultBlob; + return result; } - void RegisterBlobId(TWritePortionInfoWithBlobs& owner, const TUnifiedBlobId& blobId); + TString ExtractBlob() { + AFL_VERIFY(!Finished); + Finished = true; + TString result; + result.reserve(Size); + for (auto&& i : ChunksOrdered) { + result.append(i->GetData()); + } + ChunksOrdered.clear(); + return result; + } }; private: std::optional PortionConstructor; - std::optional PortionResult; YDB_READONLY_DEF(std::vector, Blobs); - explicit TWritePortionInfoWithBlobs(TPortionInfoConstructor&& portionConstructor) + explicit TWritePortionInfoWithBlobsConstructor(TPortionInfoConstructor&& portionConstructor) : PortionConstructor(std::move(portionConstructor)) { } @@ -73,46 +80,75 @@ class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { Blobs.emplace_back(TBlobInfo(bOperator)); return TBlobInfo::TBuilder(Blobs.back(), *this); } - + friend class TWritePortionInfoWithBlobsResult; public: std::vector> GetEntityChunks(const ui32 entityId) const; void FillStatistics(const TIndexInfo& index); - static TWritePortionInfoWithBlobs BuildByBlobs(std::vector&& chunks, + static TWritePortionInfoWithBlobsConstructor BuildByBlobs(std::vector&& chunks, const ui64 granule, const ui64 schemaVersion, const TSnapshot& snapshot, const std::shared_ptr& operators); - static TWritePortionInfoWithBlobs BuildByBlobs(std::vector&& chunks, + static TWritePortionInfoWithBlobsConstructor BuildByBlobs(std::vector&& chunks, TPortionInfoConstructor&& constructor, const std::shared_ptr& operators); - const TString& GetBlobByRangeVerified(const ui32 columnId, const ui32 chunkId) const { - for (auto&& b : Blobs) { - auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId)); - if (it == b.GetChunks().end()) { - continue; - } else { - return it->second->GetData(); - } - } - Y_ABORT_UNLESS(false); + std::vector& GetBlobs() { + return Blobs; } - ui64 GetBlobFullSizeVerified(const ui32 columnId, const ui32 chunkId) const { - for (auto&& b : Blobs) { - auto it = b.GetChunks().find(TChunkAddress(columnId, chunkId)); - if (it == b.GetChunks().end()) { - continue; - } else { - return b.GetSize(); - } - } - Y_ABORT_UNLESS(false); + TString DebugString() const { + return TStringBuilder() << "blobs_count=" << Blobs.size() << ";"; } - std::vector& GetBlobs() { - return Blobs; + TPortionInfoConstructor& GetPortionConstructor() { + AFL_VERIFY(!!PortionConstructor); + return *PortionConstructor; } +}; + +class TWritePortionInfoWithBlobsResult { +public: + class TBlobInfo { + private: + using TBlobChunks = std::vector; + YDB_READONLY_DEF(TBlobChunks, Chunks); + const TString ResultBlob; + YDB_READONLY_DEF(std::shared_ptr, Operator); + + public: + ui64 GetSize() const { + return ResultBlob.size(); + } + + TBlobInfo(const TString& blobData, TBlobChunks&& chunks, const std::shared_ptr& stOperator) + : Chunks(std::move(chunks)) + , ResultBlob(blobData) + , Operator(stOperator) + { + + } + + const TString& GetResultBlob() const { + return ResultBlob; + } + + void RegisterBlobId(TWritePortionInfoWithBlobsResult& owner, const TUnifiedBlobId& blobId) const; + }; +private: + std::optional PortionConstructor; + std::optional PortionResult; + YDB_READONLY_DEF(std::vector, Blobs); +public: + TWritePortionInfoWithBlobsResult(TWritePortionInfoWithBlobsConstructor&& constructor) + : PortionConstructor(std::move(constructor.PortionConstructor)) { + for (auto&& i : constructor.Blobs) { + Blobs.emplace_back(i.ExtractBlob(), i.ExtractChunks(), i.GetOperator()); + } + } + + TString GetBlobByRangeVerified(const ui32 entityId, const ui32 chunkIdx) const; + TString DebugString() const { return TStringBuilder() << "blobs_count=" << Blobs.size() << ";"; } @@ -135,7 +171,6 @@ class TWritePortionInfoWithBlobs: public TBasePortionInfoWithBlobs { AFL_VERIFY(!PortionResult); return *PortionConstructor; } - }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 4c07ce331af4..5f89ea7ec15c 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -275,19 +275,27 @@ TString MakeTestBlob(i64 start = 0, i64 end = 100, ui32 step = 1) { return NArrow::SerializeBatchNoCompression(batch); } -void AddIdsToBlobs(std::vector& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { +void AddIdsToBlobs(std::vector& portions, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { for (auto& portion : portions) { - for (auto& rec : portion.GetPortionConstructor().MutableRecords()) { - rec.BlobRange.BlobIdx = portion.GetPortionConstructor().RegisterBlobId(MakeUnifiedBlobId(++step, portion.GetBlobFullSizeVerified(rec.ColumnId, rec.Chunk))); - TString data = portion.GetBlobByRangeVerified(rec.ColumnId, rec.Chunk); - blobs.Add(IStoragesManager::DefaultStorageId, portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange), std::move(data)); + THashMap blobsData; + for (auto& b : portion.GetBlobs()) { + const auto blobId = MakeUnifiedBlobId(++step, b.GetSize()); + b.RegisterBlobId(portion, blobId); + blobsData.emplace(blobId, b.GetResultBlob()); + } + for (auto&& rec : portion.GetPortionConstructor().GetRecords()) { + auto range = portion.GetPortionConstructor().RestoreBlobRange(rec.BlobRange); + auto it = blobsData.find(range.BlobId); + AFL_VERIFY(it != blobsData.end()); + const TString& data = it->second; + AFL_VERIFY(range.Offset + range.Size <= data.size()); + blobs.Add(IStoragesManager::DefaultStorageId, range, data.substr(range.Offset, range.Size)); } } } -bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, - std::vector&& dataToIndex, NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { - +bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, std::vector&& dataToIndex, + NBlobOperations::NRead::TCompositeReadBlobs& blobs, ui32& step) { for (ui32 i = 0; i < dataToIndex.size(); ++i) { // Commited data always has nonzero planstep (for WriteLoadRead tests) dataToIndex[i].PlanStep = i + 1; diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index 976c2945d59a..9836a72a60cc 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -19,9 +19,9 @@ TCompactedWriteController::TCompactedWriteController(const TActorId& dstActor, T } auto* pInfo = changes.GetWritePortionInfo(i); Y_ABORT_UNLESS(pInfo); - TWritePortionInfoWithBlobs& portionWithBlobs = *pInfo; + TWritePortionInfoWithBlobsResult& portionWithBlobs = *pInfo; for (auto&& b : portionWithBlobs.GetBlobs()) { - auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId()))); + auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.GetResultBlob(), changes.MutableBlobsAction().GetWriting(b.GetOperator()->GetStorageId()))); b.RegisterBlobId(portionWithBlobs, task.GetBlobId()); WriteVolume += b.GetSize(); }