Skip to content

Commit

Permalink
Tables manager mem (ydb-platform#4692)
Browse files Browse the repository at this point in the history
Co-authored-by: nsofya <nsofya@yandex.ru>
  • Loading branch information
nsofya and nsofya authored May 21, 2024
1 parent e02e0d6 commit f208fbb
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 73 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
<< " ttl settings: " << tableProto.GetTtlSettings()
<< " at tablet " << TabletID());

TTableInfo::TTableVersionInfo tableVerProto;
NKikimrTxColumnShard::TTableVersionInfo tableVerProto;
tableVerProto.SetPathId(pathId);

// check schema changed
Expand Down Expand Up @@ -439,7 +439,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
<< " ttl settings: " << alterProto.GetTtlSettings()
<< " at tablet " << TabletID());

TTableInfo::TTableVersionInfo tableVerProto;
NKikimrTxColumnShard::TTableVersionInfo tableVerProto;
if (alterProto.HasSchemaPreset()) {
tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
Expand Down
74 changes: 42 additions & 32 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,18 @@ bool TTablesManager::FillMonitoringReport(NTabletFlatExecutor::TTransactionConte
}
}
json.InsertValue("tables_count", Tables.size());
json.InsertValue("presets_count", SchemaPresets.size());
json.InsertValue("presets_count", SchemaPresetsIds.size());
json.InsertValue("to_drop_count", PathsToDrop.size());
return true;
}

bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
using TTableVersionsInfo = TVersionedSchema<NKikimrTxColumnShard::TTableVersionInfo>;

THashMap<ui32, TSchemaPreset> schemaPresets;
THashMap<ui32, TTableVersionsInfo> tableVersions;
{
TMemoryProfileGuard g("TTablesManager/InitFromDB::Tables");
auto rowset = db.Table<Schema::TableInfo>().Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -57,6 +62,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
if (table.IsDropped()) {
PathsToDrop.insert(table.GetPathId());
}

AFL_VERIFY(tableVersions.emplace(table.GetPathId(), TTableVersionsInfo()).second);
AFL_VERIFY(Tables.emplace(table.GetPathId(), std::move(table)).second);

if (!rowset.Next()) {
Expand All @@ -67,6 +74,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {

bool isFakePresetOnly = true;
{
TMemoryProfileGuard g("TTablesManager/InitFromDB::SchemaPresets");
auto rowset = db.Table<Schema::SchemaPresetInfo>().Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -82,14 +90,16 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName());
isFakePresetOnly = false;
}
AFL_VERIFY(SchemaPresets.emplace(preset.GetId(), preset).second);
AFL_VERIFY(schemaPresets.emplace(preset.GetId(), preset).second);
AFL_VERIFY(SchemaPresetsIds.emplace(preset.GetId()).second);
if (!rowset.Next()) {
return false;
}
}
}

{
TMemoryProfileGuard g("TTablesManager/InitFromDB::Versions");
auto rowset = db.Table<Schema::TableVersionInfo>().Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -101,13 +111,14 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(Tables.contains(pathId));
NOlap::TSnapshot version(
rowset.GetValue<Schema::TableVersionInfo::SinceStep>(),
rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());
rowset.GetValue<Schema::TableVersionInfo::SinceTxId>());

auto& table = Tables.at(pathId);
TTableInfo::TTableVersionInfo versionInfo;
auto& table = Tables[pathId];
auto& versionsInfo = tableVersions[pathId];
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_table_version")("path_id", pathId)("snapshot", version)("version", versionInfo.HasSchema() ? versionInfo.GetSchema().GetVersion() : -1);
Y_ABORT_UNLESS(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
Y_ABORT_UNLESS(schemaPresets.contains(versionInfo.GetSchemaPresetId()));

if (!table.IsDropped()) {
auto& ttlSettings = versionInfo.GetTtlSettings();
Expand All @@ -120,23 +131,25 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}
}
}
table.AddVersion(version, versionInfo);
table.AddVersion(version);
versionsInfo.AddVersion(version, versionInfo);
if (!rowset.Next()) {
return false;
}
}
}

{
TMemoryProfileGuard g("TTablesManager/InitFromDB::PresetVersions");
auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
Y_ABORT_UNLESS(SchemaPresets.contains(id));
auto& preset = SchemaPresets.at(id);
Y_ABORT_UNLESS(schemaPresets.contains(id));
auto& preset = schemaPresets[id];
NOlap::TSnapshot version(
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(),
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());
Expand All @@ -151,19 +164,20 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}
}

for (const auto& [id, preset] : SchemaPresets) {
TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (const auto& [id, preset] : schemaPresets) {
if (isFakePresetOnly) {
Y_ABORT_UNLESS(id == 0);
} else {
Y_ABORT_UNLESS(id > 0);
}
for (const auto& [version, schemaInfo] : preset.GetVersions()) {
for (const auto& [version, schemaInfo] : preset.GetVersionsById()) {
if (schemaInfo.HasSchema()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)("version", schemaInfo.GetSchema().GetVersion());
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, version, schemaInfo.GetSchema());
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
} else {
PrimaryIndex->RegisterSchemaVersion(version, schemaInfo.GetSchema());
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInfo.GetSchema());
}
}
}
Expand Down Expand Up @@ -199,7 +213,7 @@ bool TTablesManager::IsReadyForWrite(const ui64 pathId) const {
}

bool TTablesManager::HasPreset(const ui32 presetId) const {
return SchemaPresets.contains(presetId);
return SchemaPresetsIds.contains(presetId);
}

const TTableInfo& TTablesManager::GetTable(const ui64 pathId) const {
Expand All @@ -211,26 +225,25 @@ ui64 TTablesManager::GetMemoryUsage() const {
ui64 memory =
Tables.size() * sizeof(TTableInfo) +
PathsToDrop.size() * sizeof(ui64) +
Ttl.PathsCount() * sizeof(TTtl::TDescription) +
SchemaPresets.size() * sizeof(TSchemaPreset);
Ttl.PathsCount() * sizeof(TTtl::TDescription);
if (PrimaryIndex) {
memory += PrimaryIndex->MemoryUsage();
}
return memory;
}

void TTablesManager::DropTable(const ui64 pathId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db) {
auto& table = Tables.at(pathId);
AFL_VERIFY(Tables.contains(pathId));
auto& table = Tables[pathId];
table.SetDropVersion(version);
PathsToDrop.insert(pathId);
Ttl.DropPathTtl(pathId);
Schema::SaveTableDropVersion(db, pathId, version.GetPlanStep(), version.GetTxId());
}

void TTablesManager::DropPreset(const ui32 presetId, const NOlap::TSnapshot& version, NIceDb::TNiceDb& db) {
auto& preset = SchemaPresets.at(presetId);
Y_ABORT_UNLESS(preset.GetName() != "default", "Cannot drop the default preset");
preset.SetDropVersion(version);
AFL_VERIFY(SchemaPresetsIds.contains(presetId));
SchemaPresetsIds.erase(presetId);
Schema::SaveSchemaPresetDropVersion(db, presetId, version);
}

Expand All @@ -247,27 +260,24 @@ void TTablesManager::RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db) {
}

bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db) {
if (SchemaPresets.contains(schemaPreset.GetId())) {
if (SchemaPresetsIds.contains(schemaPreset.GetId())) {
return false;
}
SchemaPresetsIds.emplace(schemaPreset.GetId());
Schema::SaveSchemaPresetInfo(db, schemaPreset.GetId(), schemaPreset.GetName());
SchemaPresets.emplace(schemaPreset.GetId(), schemaPreset);
return true;
}

void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(SchemaPresets.contains(presetId));
auto preset = SchemaPresets.at(presetId);
Y_ABORT_UNLESS(SchemaPresetsIds.contains(presetId));

TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
versionInfo.SetId(presetId);
versionInfo.SetSinceStep(version.GetPlanStep());
versionInfo.SetSinceTxId(version.GetTxId());
*versionInfo.MutableSchema() = schema;

auto& schemaPreset = SchemaPresets.at(presetId);
Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
schemaPreset.AddVersion(version, versionInfo);
if (versionInfo.HasSchema()) {
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, StoragesManager, version, schema);
Expand All @@ -283,21 +293,21 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
}
}

void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const NKikimrTxColumnShard::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
auto it = Tables.find(pathId);
AFL_VERIFY(it != Tables.end());
auto& table = it->second;

if (versionInfo.HasSchemaPresetId()) {
Y_ABORT_UNLESS(SchemaPresets.contains(versionInfo.GetSchemaPresetId()));
Y_ABORT_UNLESS(SchemaPresetsIds.contains(versionInfo.GetSchemaPresetId()));
} else if (versionInfo.HasSchema()) {
TSchemaPreset fakePreset;
if (SchemaPresets.empty()) {
if (SchemaPresetsIds.empty()) {
TSchemaPreset fakePreset;
Y_ABORT_UNLESS(RegisterSchemaPreset(fakePreset, db));
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
} else {
Y_ABORT_UNLESS(SchemaPresets.contains(fakePreset.GetId()));
Y_ABORT_UNLESS(SchemaPresetsIds.contains(fakePreset.GetId()));
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
}
}
Expand All @@ -314,7 +324,7 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
}
}
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
table.AddVersion(version, versionInfo);
table.AddVersion(version);
}

TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager, const ui64 tabletId)
Expand All @@ -332,7 +342,7 @@ bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, co
const auto& itTable = Tables.find(pathId);
AFL_VERIFY(itTable != Tables.end())("problem", "No schema for path")("path_id", pathId);
for (auto&& tableVersion : itTable->second.GetVersions()) {
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion.first);
NColumnShard::Schema::EraseTableVersionInfo(db, pathId, tableVersion);
}
return true;
}
Expand Down
Loading

0 comments on commit f208fbb

Please sign in to comment.