Skip to content

Commit

Permalink
Merge bbdba79 into 9d5fb19
Browse files Browse the repository at this point in the history
  • Loading branch information
nsofya authored May 20, 2024
2 parents 9d5fb19 + bbdba79 commit 30b0fef
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 25 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ bool TColumnShardScan::ProduceResults() noexcept {
ACFL_DEBUG("stage", "data_format")("batch_size", NArrow::GetTableDataSize(Result->ArrowBatch))("num_rows", numRows)("batch_columns", JoinSeq(",", batch->schema()->field_names()));
}
if (CurrentLastReadKey) {
NArrow::NMerger::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, false);
NArrow::NMerger::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, false);
NArrow::NMerger::TSortableBatchPosition pNew(result.GetLastReadKey(), 0, result.GetLastReadKey()->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
NArrow::NMerger::TSortableBatchPosition pOld(CurrentLastReadKey, 0, CurrentLastReadKey->schema()->field_names(), {}, ReadMetadataRange->IsDescSorted());
AFL_VERIFY(pOld < pNew)("old", pOld.DebugJson().GetStringRobust())("new", pNew.DebugJson().GetStringRobust());
}
CurrentLastReadKey = result.GetLastReadKey();
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}
}
}
table.AddVersion(version, 0); // Fake value
table.AddVersion(version);
versionsInfo.AddVersion(version, versionInfo);
if (!rowset.Next()) {
return false;
Expand Down Expand Up @@ -170,13 +170,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
} else {
Y_ABORT_UNLESS(id > 0);
}
for (const auto& [version, schemaInfo] : preset.GetVersions()) {
for (const auto& [version, schemaInfo] : preset.GetVersionsById()) {
if (schemaInfo.HasSchema()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)("version", schemaInfo.GetSchema().GetVersion());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, version, schemaInfo.GetSchema());
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
} else {
PrimaryIndex->RegisterSchemaVersion(version, schemaInfo.GetSchema());
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
}
}
}
Expand Down Expand Up @@ -322,7 +322,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
}
}
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
table.AddVersion(version, 0);
table.AddVersion(version);
}

TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId)
Expand All @@ -340,7 +340,7 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
for (auto&& tableVersion : itTable->second.GetVersions()) {
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion.first);
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
}
return true;
}
Expand Down
55 changes: 38 additions & 17 deletions ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,42 @@ namespace NKikimr::NColumnShard {
template<class TVersionData>
class TVersionedSchema {
private:
TMap<NOlap::TSnapshot, TVersionData> Versions;
TMap<NOlap::TSnapshot, ui64> Versions;
TMap<ui64, TVersionData> VersionsById;
TMap<ui64, NOlap::TSnapshot> MinVersionById;
public:
bool IsEmpty() const {
return Versions.empty();
return VersionsById.empty();
}

const TMap<NOlap::TSnapshot, TVersionData>& GetVersions() const {
return Versions;
const TMap<ui64, TVersionData>& GetVersionsById() const {
return VersionsById;
}

const TVersionData& GetVersion(const NOlap::TSnapshot& version) const {
const TVersionData* result = nullptr;
for (auto ver : Versions) {
if (ver.first > version) {
break;
}
result = &ver.second;
}
Y_ABORT_UNLESS(!!result);
return *result;
NOlap::TSnapshot GetMinVersionForId(const ui64 sVersion) const {
auto it = MinVersionById.find(sVersion);
Y_ABORT_UNLESS(it != MinVersionById.end());
return it->second;
}

void AddVersion(const NOlap::TSnapshot& version, const TVersionData& versionInfo) {
Versions[version] = versionInfo;
void AddVersion(const NOlap::TSnapshot& snapshot, const TVersionData& versionInfo) {
ui64 ssVersion = 0;
if (versionInfo.HasSchema()) {
ssVersion = versionInfo.GetSchema().GetVersion();
}
auto insertIt = VersionsById.emplace(ssVersion, versionInfo);
if (!insertIt.second) {
if (versionInfo.HasSchema()) {
Y_ABORT_UNLESS(versionInfo.GetSchema().DebugString()== insertIt.first->second.GetSchema().DebugString());
}
};
Y_ABORT_UNLESS(Versions.emplace(snapshot, ssVersion).second);

if (MinVersionById.contains(ssVersion)) {
MinVersionById.emplace(ssVersion, std::min(snapshot, MinVersionById.at(ssVersion)));
} else {
MinVersionById.emplace(ssVersion, snapshot);
}
}
};

Expand Down Expand Up @@ -74,11 +86,12 @@ class TSchemaPreset : public TVersionedSchema<NKikimrTxColumnShard::TSchemaPrese
}
};

class TTableInfo : public TVersionedSchema<ui32> {
class TTableInfo {
public:
ui64 PathId;
TString TieringUsage;
std::optional<NOlap::TSnapshot> DropVersion;
YDB_READONLY_DEF(TSet<NOlap::TSnapshot>, Versions);

public:
const TString& GetTieringUsage() const {
Expand All @@ -90,6 +103,10 @@ class TTableInfo : public TVersionedSchema<ui32> {
return *this;
}

bool IsEmpty() const {
return Versions.empty();
}

ui64 GetPathId() const {
return PathId;
}
Expand All @@ -98,6 +115,10 @@ class TTableInfo : public TVersionedSchema<ui32> {
DropVersion = version;
}

void AddVersion(const NOlap::TSnapshot& snapshot) {
Versions.insert(snapshot);
}

bool IsDropped() const {
return DropVersion.has_value();
}
Expand Down

0 comments on commit 30b0fef

Please sign in to comment.