Skip to content

Commit

Permalink
Merge 330a116 into 1b4620e
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jun 3, 2024
2 parents 1b4620e + 330a116 commit 788d980
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
13 changes: 8 additions & 5 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
filler(1100000, 300100000, 10000);

}
const ui64 initCount = csController->GetActualizationRefreshSchemeCount().Val();
AFL_VERIFY(initCount == 3)("started_value", initCount);

for (ui32 i = 0; i < 10; ++i) {
auto alterQuery = TStringBuilder() <<
Expand All @@ -144,8 +146,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
const ui64 startCount = csController->GetActualizationRefreshSchemeCount().Val();
AFL_VERIFY(startCount == 30);
const ui64 updatesCount = csController->GetActualizationRefreshSchemeCount().Val();
AFL_VERIFY(updatesCount == 30 + initCount)("after_modification", updatesCount);

for (auto&& i : csController->GetShardActualIds()) {
kikimr.GetTestServer().GetRuntime()->Send(MakePipePerNodeCacheID(false), NActors::TActorId(), new TEvPipeCache::TEvForward(
Expand All @@ -165,8 +167,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[20000u;]])");
}

AFL_VERIFY(startCount + 3 /*tables count*/ * 3 /*2 * normalizers + main_load*/ ==
(ui64)csController->GetActualizationRefreshSchemeCount().Val())("start", startCount)("count", csController->GetActualizationRefreshSchemeCount().Val());
AFL_VERIFY(updatesCount + 3 /*tablets count*/ * 3 /*2 * normalizers + main_load*/ ==
(ui64)csController->GetActualizationRefreshSchemeCount().Val())("updates", updatesCount)("count", csController->GetActualizationRefreshSchemeCount().Val());
}

Y_UNIT_TEST(Indexes) {
Expand Down Expand Up @@ -308,7 +310,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
CompareYson(result, R"([[1u;]])");
}

AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.20 * csController->GetIndexesSkippingOnSelect().Val());
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.20 * csController->GetIndexesSkippingOnSelect().Val())
("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val());

}

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
tableVerProto.SetSchemaPresetId(preset.GetId());

if (TablesManager.RegisterSchemaPreset(preset, db)) {
TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db, Tiers);
}
} else {
Y_ABORT_UNLESS(tableProto.HasSchema(), "Tables has either schema or preset");
Expand Down Expand Up @@ -442,7 +442,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
NKikimrTxColumnShard::TTableVersionInfo tableVerProto;
if (alterProto.HasSchemaPreset()) {
tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db, Tiers);
} else if (alterProto.HasSchema()) {
*tableVerProto.MutableSchema() = alterProto.GetSchema();
}
Expand Down Expand Up @@ -501,7 +501,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
if (!TablesManager.HasPreset(presetProto.GetId())) {
continue; // we don't update presets that we don't use
}
TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db, Tiers);
}
}

Expand Down Expand Up @@ -751,14 +751,15 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
ACFL_DEBUG("background", "ttl")("path", i.first)("info", i.second.GetDebugString());
}

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);

if (indexChanges.empty()) {
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
return false;
}

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
for (auto&& i : indexChanges) {
const TString externalTaskId = i->GetTaskIdentifier();
const bool needWrites = i->NeedConstruction();
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
return true;
}

void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager) {
Y_ABORT_UNLESS(SchemaPresetsIds.contains(presetId));

TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
Expand All @@ -285,6 +285,9 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
if (manager->IsReady()) {
PrimaryIndex->OnTieringModified(manager, Ttl, {});
}
} else {
PrimaryIndex->RegisterSchemaVersion(version, schema);
}
Expand All @@ -310,10 +313,10 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
if (SchemaPresetsIds.empty()) {
TSchemaPreset fakePreset;
Y_ABORT_UNLESS(RegisterSchemaPreset(fakePreset, db));
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db, manager);
} else {
Y_ABORT_UNLESS(SchemaPresetsIds.contains(fakePreset.GetId()));
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db, manager);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TTablesManager {
void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);

void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddSchemaVersion(const ui32 presetId, const NOlap::TSnapshot& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
void AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& version, const NKikimrTxColumnShard::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db, std::shared_ptr<TTiersManager>& manager);
bool FillMonitoringReport(NTabletFlatExecutor::TTransactionContext& txc, NJson::TJsonValue& json);

Expand Down

0 comments on commit 788d980

Please sign in to comment.