From 868ecfc753c9d17b1925a96596ba7bd3c45377d2 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 6 May 2024 06:55:35 +0300 Subject: [PATCH 1/2] fix read checker (#4301) --- ydb/core/tx/columnshard/engines/reader/actor/actor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index 27006920132b..878153f52baf 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -245,8 +245,8 @@ bool TColumnShardScan::ProduceResults() noexcept { ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetTableDataSize(Result->ArrowBatch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names())); } if (CurrentLastReadKey) { - NArrow::NMerger::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, false); - NArrow::NMerger::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, false); + NArrow::NMerger::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted()); + NArrow::NMerger::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted()); AFL_VERIFY(pOld < pNew)("old", pOld.DebugJson().GetStringRobust())("new", pNew.DebugJson().GetStringRobust()); } CurrentLastReadKey = result.GetLastReadKey(); From bbdba79d2c62c116b57068c7124d661da0bf0567 Mon Sep 17 00:00:00 2001 From: nsofya Date: Mon, 20 May 2024 18:57:30 +0300 Subject: [PATCH 2/2] Remove duplicated schema versions on tables load. --- ydb/core/tx/columnshard/tables_manager.cpp | 12 ++--- ydb/core/tx/columnshard/tables_manager.h | 55 +++++++++++++++------- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index cdc1d6da4ea1..027c7c3f3599 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -130,7 +130,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { } } } - table.AddVersion(version, 0); // Fake value + table.AddVersion(version); versionsInfo.AddVersion(version, versionInfo); if (!rowset.Next()) { return false; @@ -170,13 +170,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { } else { Y_ABORT_UNLESS(id > 0); } - for (const auto& [version, schemaInfo] : preset.GetVersions()) { + for (const auto& [version, schemaInfo] : preset.GetVersionsById()) { if (schemaInfo.HasSchema()) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)("version", schemaInfo.GetSchema().GetVersion()); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique(TabletId, StoragesManager, version, schemaInfo.GetSchema()); + PrimaryIndex = std::make_unique(TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema()); } else { - PrimaryIndex->RegisterSchemaVersion(version, schemaInfo.GetSchema()); + PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema()); } } } @@ -322,7 +322,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& } } Schema::SaveTableVersionInfo(db, pathId, version, versionInfo); - table.AddVersion(version, 0); + table.AddVersion(version); } TTablesManager::TTablesManager(const std::shared_ptr& storagesManager, const ui64 tabletId) @@ -340,7 +340,7 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co const auto& itTable = Tables.find(pathId); AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId); for (auto&& tableVersion : itTable->second.GetVersions()) { - NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion.first); + NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion); } return true; } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 34744464d978..f8ba1916c0b5 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -16,30 +16,42 @@ namespace NKikimr::NColumnShard { template class TVersionedSchema { private: - TMap Versions; + TMap Versions; + TMap VersionsById; + TMap MinVersionById; public: bool IsEmpty() const { - return Versions.empty(); + return VersionsById.empty(); } - const TMap& GetVersions() const { - return Versions; + const TMap& GetVersionsById() const { + return VersionsById; } - const TVersionData& GetVersion(const NOlap::TSnapshot& version) const { - const TVersionData* result = nullptr; - for (auto ver : Versions) { - if (ver.first > version) { - break; - } - result = &ver.second; - } - Y_ABORT_UNLESS(!!result); - return *result; + NOlap::TSnapshot GetMinVersionForId(const ui64 sVersion) const { + auto it = MinVersionById.find(sVersion); + Y_ABORT_UNLESS(it != MinVersionById.end()); + return it->second; } - void AddVersion(const NOlap::TSnapshot& version, const TVersionData& versionInfo) { - Versions[version] = versionInfo; + void AddVersion(const NOlap::TSnapshot& snapshot, const TVersionData& versionInfo) { + ui64 ssVersion = 0; + if (versionInfo.HasSchema()) { + ssVersion = versionInfo.GetSchema().GetVersion(); + } + auto insertIt = VersionsById.emplace(ssVersion, versionInfo); + if (!insertIt.second) { + if (versionInfo.HasSchema()) { + Y_ABORT_UNLESS(versionInfo.GetSchema().DebugString()== insertIt.first->second.GetSchema().DebugString()); + } + }; + Y_ABORT_UNLESS(Versions.emplace(snapshot, ssVersion).second); + + if (MinVersionById.contains(ssVersion)) { + MinVersionById.emplace(ssVersion, std::min(snapshot, MinVersionById.at(ssVersion))); + } else { + MinVersionById.emplace(ssVersion, snapshot); + } } }; @@ -74,11 +86,12 @@ class TSchemaPreset : public TVersionedSchema { +class TTableInfo { public: ui64 PathId; TString TieringUsage; std::optional DropVersion; + YDB_READONLY_DEF(TSet, Versions); public: const TString& GetTieringUsage() const { @@ -90,6 +103,10 @@ class TTableInfo : public TVersionedSchema { return *this; } + bool IsEmpty() const { + return Versions.empty(); + } + ui64 GetPathId() const { return PathId; } @@ -98,6 +115,10 @@ class TTableInfo : public TVersionedSchema { DropVersion = version; } + void AddVersion(const NOlap::TSnapshot& snapshot) { + Versions.insert(snapshot); + } + bool IsDropped() const { return DropVersion.has_value(); }