Skip to content

Commit

Permalink
Merge 6a1b7ab into 916a423
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 3, 2025
2 parents 916a423 + 6a1b7ab commit 952590a
Show file tree
Hide file tree
Showing 33 changed files with 408 additions and 258 deletions.
1 change: 0 additions & 1 deletion ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
, TableClient(Kikimr.GetTableClient()) {
CSController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
CSController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
CSController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
}

void WaitResharding(const TString& hint = "") {
Expand Down
81 changes: 44 additions & 37 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

Expand Down Expand Up @@ -103,7 +102,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());

TLocalHelper(kikimr).CreateTestOlapTableWithoutStore();
Expand Down Expand Up @@ -341,20 +339,36 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
public:
TTestIndexesScenario& Initialize() {
Settings = TKikimrSettings().SetWithSampleTables(false);
Settings.AppConfig.MutableColumnShardConfig()->SetReaderClassName("SIMPLE");
Kikimr = std::make_unique<TKikimrRunner>(Settings);
return *this;
}

void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
csController->SetOverrideBlobSplitSettings(NOlap::NSplitter::TSplitSettings());
TLocalHelper(*Kikimr).CreateTestOlapTable();
auto tableClient = Kikimr->GetTableClient();

// Tests::NCommon::TLoggerInit(kikimr).Initialize();
/*
Tests::NCommon::TLoggerInit(*Kikimr)
.SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS")
.SetPriority(NActors::NLog::PRI_DEBUG)
.Initialize();
*/

{
auto alterQuery =
TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, `COMPACTION_PLANNER.CLASS_NAME`=`lc-buckets`, `COMPACTION_PLANNER.FEATURES`=`
{"levels" : [{"class_name" : "Zero", "portions_live_duration" : "10s", "expected_blobs_size" : 2048000, "portions_count_available" : 1},
{"class_name" : "Zero"}]}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery =
TStringBuilder() << Sprintf(
Expand Down Expand Up @@ -394,15 +408,13 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
std::vector<ui32> levels;

{
for (ui32 i = 0; i < 2; ++i) {
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);
}
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(*Kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
for (ui32 i = 0; i < count; ++i) {
Expand All @@ -425,39 +437,33 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
TInstant start = Now();
ui32 compactionsStart = csController->GetCompactionStartedCounter().Val();
while (Now() - start < TDuration::Seconds(10)) {
if (compactionsStart != csController->GetCompactionStartedCounter().Val()) {
compactionsStart = csController->GetCompactionStartedCounter().Val();
start = Now();
}
Cerr << "WAIT_COMPACTION: " << csController->GetCompactionStartedCounter().Val() << Endl;
Sleep(TDuration::Seconds(1));
}
csController->WaitCompactions(TDuration::Seconds(25));
// important checker for control compactions (<=21) and control indexes constructed (>=21)
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val());
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 3)("count", csController->GetCompactionStartedCounter().Val());

{
ExecuteSQL(R"(SELECT COUNT(*)
FROM `/Root/olapStore/olapTable`
WHERE resource_id LIKE '%110a151' AND resource_id LIKE '110a%' AND resource_id LIKE '%dd%')", "[[0u;]]");
WHERE resource_id LIKE '%110a151' AND resource_id LIKE '110a%' AND resource_id LIKE '%dd%')",
"[[0u;]]");
AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val());
}
{
ResetZeroLevel(csController);
ExecuteSQL(R"(SELECT COUNT(*)
FROM `/Root/olapStore/olapTable`
WHERE resource_id LIKE '%110a151%')", "[[0u;]]");
WHERE resource_id LIKE '%110a151%')",
"[[0u;]]");
AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart == 3);
}
{
ResetZeroLevel(csController);
ExecuteSQL(R"(SELECT COUNT(*)
FROM `/Root/olapStore/olapTable`
WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222')", "[[0u;]]");
WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222')",
"[[0u;]]");

AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0)("val", csController->GetIndexesSkippedNoData().Val());
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() - ApproveStart < csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
Expand All @@ -479,7 +485,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY((csController->GetIndexesApprovedOnSelect().Val() - ApproveStart) * 5 < csController->GetIndexesSkippingOnSelect().Val() - SkipStart)
AFL_VERIFY((csController->GetIndexesApprovedOnSelect().Val() - ApproveStart) < csController->GetIndexesSkippingOnSelect().Val() - SkipStart)
("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
Expand All @@ -497,8 +503,9 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
// AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
// "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
// "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
Expand All @@ -514,9 +521,9 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
// AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
// "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
// "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
{
ResetZeroLevel(csController);
Expand All @@ -532,9 +539,9 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
};
ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
"approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
"skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
// AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart)(
// "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
// "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
}
};
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2649,7 +2649,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

{
auto alterQuery = TStringBuilder() <<
Expand Down Expand Up @@ -2734,7 +2733,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
// Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
Expand Down Expand Up @@ -2790,7 +2788,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Indexation);

testHelper.CreateTestOlapTable();
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/ut/olap/sparsed_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper helper(kikimr);
helper.SetOptionalStorageId(NOlap::NBlobOperations::TGlobal::DefaultStorageId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ message TCompactionLevelConstructorContainer {
message TZeroLevel {
optional uint32 PortionsLiveDurationSeconds = 1;
optional uint64 ExpectedBlobsSize = 2;
optional uint64 PortionsCountAvailable = 3;
}

oneof Implementation {
Expand Down
1 change: 0 additions & 1 deletion ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, bool useRealThreads)

CSController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
CSController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
CSController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

Server->GetRuntime()->SetLogPriority(NKikimrServices::STATISTICS, NActors::NLog::PRI_DEBUG);
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class TChangesWithAppend;
class TCompactColumnEngineChanges;
class TInsertColumnEngineChanges;
class TStoragesManager;
class TRemovePortionsChange;
class TMovePortionsChange;

namespace NReader {
class TTxScan;
Expand Down Expand Up @@ -206,6 +208,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class NOlap::NReader::TTxInternalScan;
friend class NOlap::NReader::NPlain::TIndexScannerConstructor;
friend class NOlap::NReader::NSimple::TIndexScannerConstructor;
friend class NOlap::TRemovePortionsChange;
friend class NOlap::TMovePortionsChange;

class TStoragesManager;
friend class TTxController;
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/columnshard/common/limits.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ class TGlobalLimits {
static constexpr inline ui64 ScanMemoryLimit = 3ULL << 30;

static constexpr inline ui64 DefaultBlobsMemoryIntervalLimit = ScanMemoryLimit;
static constexpr inline ui64 DefaultRejectMemoryIntervalLimit = ScanMemoryLimit;
static constexpr inline ui64 DefaultReduceMemoryIntervalLimit = 0.8 * ScanMemoryLimit;
static constexpr inline ui64 DefaultReadSequentiallyBufferSize = ((ui64)8) << 20;
};
}
Loading

0 comments on commit 952590a

Please sign in to comment.