diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 1f5504317aa2..856c4107da8b 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -20,7 +20,10 @@ std::vector TMerger::Execute(c arrow::FieldVector indexFields; indexFields.emplace_back(IColumnMerger::PortionIdField); indexFields.emplace_back(IColumnMerger::PortionRecordIndexField); - IIndexInfo::AddSpecialFields(indexFields); + if (resultFiltered->HasColumnId((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { + IIndexInfo::AddDeleteFields(indexFields); + } + IIndexInfo::AddSnapshotFields(indexFields); auto dataSchema = std::make_shared(indexFields); NArrow::NMerger::TMergePartialStream mergeStream( resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames()); @@ -137,7 +140,7 @@ std::vector TMerger::Execute(c TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters); auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount()); - const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true); + const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, false); auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups), dataWithSecondary.GetSecondaryInplaceData(), pathId, resultFiltered->GetVersion(), resultFiltered->GetSnapshot(), SaverContext.GetStoragesManager()); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 9fced30f5c9e..242c3939ecc4 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -160,6 +160,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont auto blobSchema = context.SchemaVersions.GetSchemaVerified(inserted.GetSchemaVersion()); std::vector filteredIds = inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(true)); usageColumnIds.insert(filteredIds.begin(), filteredIds.end()); + if (inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { + usageColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); + } if (usageColumnIds.size() == resultSchema->GetIndexInfo().GetColumnIds(true).size()) { break; } @@ -179,8 +182,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont } IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot()); - IIndexInfo::AddDeleteFlagsColumn(*batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); - usageColumnIds.insert(IIndexInfo::GetSystemColumnIds().begin(), IIndexInfo::GetSystemColumnIds().end()); + if (usageColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) { + IIndexInfo::AddDeleteFlagsColumn(*batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete); + } + usageColumnIds.insert(IIndexInfo::GetSnapshotColumnIds().begin(), IIndexInfo::GetSnapshotColumnIds().end()); batch = resultSchema->NormalizeBatch(*blobSchema, batch, usageColumnIds).DetachResult(); pathBatches.Add(inserted, shardingFilterCommit, batch); diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h index 2776bc319a81..43b337c1f120 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h @@ -77,6 +77,10 @@ class IIndexInfo { fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64())); } + static void AddDeleteFields(std::vector>& fields) { + fields.push_back(arrow::field(SPEC_COL_DELETE_FLAG, arrow::boolean())); + } + static const std::set& GetSnapshotColumnIdsSet() { static const std::set result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID }; return result;