Skip to content

Commit

Permalink
Merge c8b77a8 into 34b6297
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 26, 2024
2 parents 34b6297 + c8b77a8 commit bc277d9
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
arrow::FieldVector indexFields;
indexFields.emplace_back(IColumnMerger::PortionIdField);
indexFields.emplace_back(IColumnMerger::PortionRecordIndexField);
IIndexInfo::AddSpecialFields(indexFields);
if (resultFiltered->HasColumnId((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
IIndexInfo::AddDeleteFields(indexFields);
}
IIndexInfo::AddSnapshotFields(indexFields);
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
NArrow::NMerger::TMergePartialStream mergeStream(
resultFiltered->GetIndexInfo().GetReplaceKey(), dataSchema, false, IIndexInfo::GetSnapshotColumnNames());
Expand Down Expand Up @@ -137,7 +140,7 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
TGeneralSerializedSlice slice(dataWithSecondary.GetExternalData(), schemaDetails, Context.Counters.SplitterCounters);

auto b = batchResult->Slice(recordIdx, slice.GetRecordsCount());
const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, true);
const ui32 deletionsCount = IIndexInfo::CalcDeletions(b, false);
auto constructor = TWritePortionInfoWithBlobsConstructor::BuildByBlobs(slice.GroupChunksByBlobs(groups),
dataWithSecondary.GetSecondaryInplaceData(), pathId, resultFiltered->GetVersion(), resultFiltered->GetSnapshot(),
SaverContext.GetStoragesManager());
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
auto blobSchema = context.SchemaVersions.GetSchemaVerified(inserted.GetSchemaVersion());
std::vector<ui32> filteredIds = inserted.GetMeta().GetSchemaSubset().Apply(blobSchema->GetIndexInfo().GetColumnIds(true));
usageColumnIds.insert(filteredIds.begin(), filteredIds.end());
if (inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
usageColumnIds.emplace((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
}
if (usageColumnIds.size() == resultSchema->GetIndexInfo().GetColumnIds(true).size()) {
break;
}
Expand All @@ -179,8 +182,10 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
}

IIndexInfo::AddSnapshotColumns(*batch, inserted.GetSnapshot());
IIndexInfo::AddDeleteFlagsColumn(*batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete);
usageColumnIds.insert(IIndexInfo::GetSystemColumnIds().begin(), IIndexInfo::GetSystemColumnIds().end());
if (usageColumnIds.contains((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG)) {
IIndexInfo::AddDeleteFlagsColumn(*batch, inserted.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete);
}
usageColumnIds.insert(IIndexInfo::GetSnapshotColumnIds().begin(), IIndexInfo::GetSnapshotColumnIds().end());

batch = resultSchema->NormalizeBatch(*blobSchema, batch, usageColumnIds).DetachResult();
pathBatches.Add(inserted, shardingFilterCommit, batch);
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class IIndexInfo {
fields.push_back(arrow::field(SPEC_COL_TX_ID, arrow::uint64()));
}

static void AddDeleteFields(std::vector<std::shared_ptr<arrow::Field>>& fields) {
fields.push_back(arrow::field(SPEC_COL_DELETE_FLAG, arrow::boolean()));
}

static const std::set<ui32>& GetSnapshotColumnIdsSet() {
static const std::set<ui32> result = { (ui32)ESpecialColumn::PLAN_STEP, (ui32)ESpecialColumn::TX_ID };
return result;
Expand Down

0 comments on commit bc277d9

Please sign in to comment.