Skip to content

Commit

Permalink
dont use default columns for merger (ydb-platform#12380)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 808e478 commit 811317d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks(

for (auto&& i : portions) {
auto blobsSchema = i.GetPortionInfo().GetSchema(context.SchemaVersions);
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds).DetachResult();
auto batch = i.RestoreBatch(*blobsSchema, *resultFiltered, seqDataColumnIds, false).DetachResult();
std::shared_ptr<NArrow::TColumnFilter> filter =
BuildPortionFilter(shardingActual, batch, i.GetPortionInfo(), usedPortionIds, resultFiltered);
merger.AddBatch(batch, filter);
Expand Down
129 changes: 69 additions & 60 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,78 @@
namespace NKikimr::NOlap {

namespace {

void FillDefaultColumn(
TPortionDataAccessor::TColumnAssemblingInfo& column, const TPortionInfo& portionInfo, const TSnapshot& defaultSnapshot) {
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>(defaultSnapshot.GetPlanStep()), false));
}
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>(defaultSnapshot.GetTxId()), false));
}
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID) {
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(
portionInfo.GetRecordsCount(), std::make_shared<arrow::UInt64Scalar>((ui64)portionInfo.GetInsertWriteIdVerified()), false));
}
if (column.GetColumnId() == (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG) {
AFL_VERIFY(portionInfo.GetRecordsCount() == portionInfo.GetMeta().GetDeletionsCount() || portionInfo.GetMeta().GetDeletionsCount() == 0)("deletes",
portionInfo.GetMeta().GetDeletionsCount())("count", portionInfo.GetRecordsCount());
column.AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(
portionInfo.GetRecordsCount(), std::make_shared<arrow::BooleanScalar>((bool)portionInfo.GetMeta().GetDeletionsCount()), true));
}
}

template <class TExternalBlobInfo>
TPortionDataAccessor::TPreparedBatchData PrepareForAssembleImpl(const TPortionDataAccessor& portionData, const TPortionInfo& portionInfo,
const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TExternalBlobInfo>& blobsData,
const std::optional<TSnapshot>& defaultSnapshot) {
const std::optional<TSnapshot>& defaultSnapshot, const bool restoreAbsent) {
std::vector<TPortionDataAccessor::TColumnAssemblingInfo> columns;
columns.reserve(resultSchema.GetColumnIds().size());
const ui32 rowsCount = portionInfo.GetRecordsCount();
auto it = portionData.GetRecordsVerified().begin();

TSnapshot defaultSnapshotLocal = TSnapshot::Zero();
if (portionInfo.HasCommitSnapshot()) {
defaultSnapshotLocal = portionInfo.GetCommitSnapshotVerified();
} else if (defaultSnapshot) {
defaultSnapshotLocal = *defaultSnapshot;
}

for (auto&& i : resultSchema.GetColumnIds()) {
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
if (portionInfo.HasInsertWriteId()) {
if (portionInfo.HasCommitSnapshot()) {
if (i == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::UInt64Scalar>(portionInfo.GetCommitSnapshotVerified().GetPlanStep()), false));
}
if (i == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::UInt64Scalar>(portionInfo.GetCommitSnapshotVerified().GetPlanStep()), false));
}
} else {
if (i == (ui32)IIndexInfo::ESpecialColumn::PLAN_STEP) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::UInt64Scalar>(defaultSnapshot ? defaultSnapshot->GetPlanStep() : 0)));
}
if (i == (ui32)IIndexInfo::ESpecialColumn::TX_ID) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::UInt64Scalar>(defaultSnapshot ? defaultSnapshot->GetTxId() : 0)));
}
}
if (i == (ui32)IIndexInfo::ESpecialColumn::WRITE_ID) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::UInt64Scalar>((ui64)portionInfo.GetInsertWriteIdVerified()), false));
}
if (i == (ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG) {
columns.back().AddBlobInfo(0, portionInfo.GetRecordsCount(),
TPortionDataAccessor::TAssembleBlobInfo(portionInfo.GetRecordsCount(),
std::make_shared<arrow::BooleanScalar>((bool)portionInfo.GetMeta().GetDeletionsCount()), true));
}
while (it != portionData.GetRecordsVerified().end() && it->GetColumnId() < i) {
++it;
continue;
}
}
{
int skipColumnId = -1;
TPortionDataAccessor::TColumnAssemblingInfo* currentAssembler = nullptr;
for (auto& rec : portionData.GetRecordsVerified()) {
if (skipColumnId == (int)rec.ColumnId) {
continue;
if ((it == portionData.GetRecordsVerified().end() || i < it->GetColumnId())) {
if (restoreAbsent || IIndexInfo::IsSpecialColumn(i)) {
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
}
if (!currentAssembler || rec.ColumnId != currentAssembler->GetColumnId()) {
const i32 resultPos = resultSchema.GetFieldIndex(rec.ColumnId);
if (resultPos < 0) {
skipColumnId = rec.ColumnId;
continue;
}
AFL_VERIFY((ui32)resultPos < columns.size());
currentAssembler = &columns[resultPos];
if (!portionInfo.HasInsertWriteId()) {
continue;
}
auto it = blobsData.find(rec.GetAddress());
AFL_VERIFY(it != blobsData.end())("size", blobsData.size())("address", rec.GetAddress().DebugString());
currentAssembler->AddBlobInfo(rec.Chunk, rec.GetMeta().GetRecordsCount(), std::move(it->second));
blobsData.erase(it);
FillDefaultColumn(columns.back(), portionInfo, defaultSnapshotLocal);
}
if (it == portionData.GetRecordsVerified().end()) {
continue;
} else if (it->GetColumnId() != i) {
AFL_VERIFY(i < it->GetColumnId());
continue;
}
columns.emplace_back(rowsCount, dataSchema.GetColumnLoaderOptional(i), resultSchema.GetColumnLoaderVerified(i));
while (it != portionData.GetRecordsVerified().end() && it->GetColumnId() == i) {
auto itBlobs = blobsData.find(it->GetAddress());
AFL_VERIFY(itBlobs != blobsData.end())("size", blobsData.size())("address", it->GetAddress().DebugString());
columns.back().AddBlobInfo(it->Chunk, it->GetMeta().GetRecordsCount(), std::move(itBlobs->second));
blobsData.erase(itBlobs);

++it;
continue;
}
}

Expand All @@ -98,14 +105,15 @@ TPortionDataAccessor::TPreparedBatchData PrepareForAssembleImpl(const TPortionDa
} // namespace

TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssemble(const ISnapshotSchema& dataSchema,
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot) const {
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot);
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot,
const bool restoreAbsent) const {
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
}

TPortionDataAccessor::TPreparedBatchData TPortionDataAccessor::PrepareForAssemble(const ISnapshotSchema& dataSchema,
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData,
const std::optional<TSnapshot>& defaultSnapshot) const {
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot);
const ISnapshotSchema& resultSchema, THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot,
const bool restoreAbsent) const {
return PrepareForAssembleImpl(*this, *PortionInfo, dataSchema, resultSchema, blobsData, defaultSnapshot, restoreAbsent);
}

void TPortionDataAccessor::FillBlobRangesByStorage(THashMap<TString, THashSet<TBlobRange>>& result, const TVersionedIndex& index) const {
Expand Down Expand Up @@ -738,6 +746,7 @@ std::shared_ptr<NArrow::NAccessor::TDeserializeChunkedArray> TPortionDataAccesso
NArrow::NAccessor::TDeserializeChunkedArray::TChunk TPortionDataAccessor::TAssembleBlobInfo::BuildDeserializeChunk(
const std::shared_ptr<TColumnLoader>& loader) const {
if (DefaultRowsCount) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "build_trivial");
Y_ABORT_UNLESS(!Data);
auto col = std::make_shared<NArrow::NAccessor::TTrivialArray>(
NArrow::TThreadSimpleArraysCache::Get(loader->GetField()->type(), DefaultValue, DefaultRowsCount));
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/data_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,10 @@ class TPortionDataAccessor {
};

TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt) const;
THashMap<TChunkAddress, TString>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt,
const bool restoreAbsent = true) const;
TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt) const;
THashMap<TChunkAddress, TAssembleBlobInfo>& blobsData, const std::optional<TSnapshot>& defaultSnapshot = std::nullopt, const bool restoreAbsent = true) const;

class TPage {
private:
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/portions/read_with_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ void TReadPortionInfoWithBlobs::RestoreChunk(const std::shared_ptr<IPortionDataC
}

TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> TReadPortionInfoWithBlobs::RestoreBatch(
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns) const {
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns, const bool restoreAbsent) const {
THashMap<TChunkAddress, TString> blobs;
NActors::TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("portion_id", PortionInfo.GetPortionInfo().GetPortionId());
for (auto&& i : PortionInfo.GetRecordsVerified()) {
blobs[i.GetAddress()] = GetBlobByAddressVerified(i.ColumnId, i.Chunk);
Y_ABORT_UNLESS(blobs[i.GetAddress()].size() == i.BlobRange.Size);
}
return PortionInfo.PrepareForAssemble(data, resultSchema, blobs).AssembleToGeneralContainer(seqColumns);
return PortionInfo.PrepareForAssemble(data, resultSchema, blobs, {}, restoreAbsent).AssembleToGeneralContainer(seqColumns);
}

TReadPortionInfoWithBlobs TReadPortionInfoWithBlobs::RestorePortion(
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/engines/portions/read_with_blobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class TReadPortionInfoWithBlobs: public TBasePortionInfoWithBlobs {
const TPortionDataAccessor& portion, NBlobOperations::NRead::TCompositeReadBlobs& blobs,
const TIndexInfo& indexInfo);

TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> RestoreBatch(const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns) const;
TConclusion<std::shared_ptr<NArrow::TGeneralContainer>> RestoreBatch(
const ISnapshotSchema& data, const ISnapshotSchema& resultSchema, const std::set<ui32>& seqColumns, const bool restoreAbsent = true) const;
static std::optional<TWritePortionInfoWithBlobsResult> SyncPortion(TReadPortionInfoWithBlobs&& source,
const ISnapshotSchema::TPtr& from, const ISnapshotSchema::TPtr& to, const TString& targetTier, const std::shared_ptr<IStoragesManager>& storages,
std::shared_ptr<NColumnShard::TSplitterCounters> counters);
Expand Down

0 comments on commit 811317d

Please sign in to comment.