diff --git a/ydb/core/testlib/basics/services.cpp b/ydb/core/testlib/basics/services.cpp index 16ed1d15d96d..560ca56f68c0 100644 --- a/ydb/core/testlib/basics/services.cpp +++ b/ydb/core/testlib/basics/services.cpp @@ -134,6 +134,7 @@ namespace NPDisk { pageCollectionCacheConfig->CacheConfig = new TCacheCacheConfig(caches.Shared, nullptr, nullptr, nullptr); pageCollectionCacheConfig->TotalAsyncQueueInFlyLimit = caches.AsyncQueue; pageCollectionCacheConfig->TotalScanQueueInFlyLimit = caches.ScanQueue; + pageCollectionCacheConfig->Counters = MakeIntrusive(runtime.GetDynamicCounters(nodeIndex)); runtime.AddLocalService(MakeSharedPageCacheId(0), TActorSetupCmd( diff --git a/ydb/core/tx/datashard/datashard__stats.cpp b/ydb/core/tx/datashard/datashard__stats.cpp index 9257faf4df98..aa5de6e1b142 100644 --- a/ydb/core/tx/datashard/datashard__stats.cpp +++ b/ydb/core/tx/datashard/datashard__stats.cpp @@ -116,6 +116,7 @@ class TAsyncTableStatsBuilder : public TActorBootstrappedGetTypeRewrite()) { case TEvDataShard::TEvPeriodicTableStats::EventType: { stats = ev->Get()->Record; - if (stats.GetTableStats().GetPartCount() >= minPartCount) { + if (stats.GetTableStats().GetPartCount() >= minPartCount && stats.GetTableStats().GetRowCount() >= minRows) { captured = true; } break; @@ -44,15 +45,6 @@ Y_UNIT_TEST_SUITE(DataShardStats) { return stats; } - NKikimrTxDataShard::TEvCompactTableResult CompactTable(TTestActorRuntime& runtime, ui64 tabletId, const TPathId& pathId) { - auto sender = runtime.AllocateEdgeActor(); - auto request = MakeHolder(pathId.OwnerId, pathId.LocalPathId); - runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries()); - - auto ev = runtime.GrabEdgeEventRethrow(sender); - return ev->Get()->Record; - } - NKikimrTableStats::TTableStats GetTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 tableId) { auto sender = runtime.AllocateEdgeActor(); auto request = MakeHolder(tableId); @@ -88,29 +80,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) { InitRoot(server, sender); CreateShardedTable(server, sender, "/Root", "table-1", 1); - auto shards = GetTableShards(server, sender, "/Root/table-1"); - UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"); - TPathId pathId; { Cerr << "... waiting for stats after upsert" << Endl; auto stats = WaitTableStats(runtime); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 704u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u); - pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId()); } - CompactTable(runtime, shards.at(0), pathId); + CompactTable(runtime, shard1, tableId1, false); { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, /* minPartCount */ 1); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + auto stats = WaitTableStats(runtime, 1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 65u); @@ -144,30 +134,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) { .Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false, "hdd"}}) .Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}, {.Name = "hdd", .DataPoolKind = "hdd"}}); CreateShardedTable(server, sender, "/Root", "table-1", opts); - - auto shards = GetTableShards(server, sender, "/Root/table-1"); - UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3)"); - TPathId pathId; { Cerr << "... waiting for stats after upsert" << Endl; auto stats = WaitTableStats(runtime); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 752u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u); - pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId()); } - CompactTable(runtime, shards.at(0), pathId); + CompactTable(runtime, shard1, tableId1, false); { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, /* minPartCount */ 1); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + auto stats = WaitTableStats(runtime, 1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 115u); @@ -203,8 +190,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) { InitRoot(server, sender); CreateShardedTable(server, sender, "/Root", "table-1", 1); - auto shards = GetTableShards(server, sender, "/Root/table-1"); - UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); const int count = 2000; TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES "; @@ -215,24 +202,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) { } ExecSQL(server, sender, query); - TPathId pathId; { Cerr << "... waiting for stats after upsert" << Endl; auto stats = WaitTableStats(runtime); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 196096u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u); - pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId()); } - CompactTable(runtime, shards.at(0), pathId); + CompactTable(runtime, shard1, tableId1, false); { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, /* minPartCount */ 1); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + auto stats = WaitTableStats(runtime, 1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 30100u); @@ -244,7 +229,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { } { - auto stats = GetTableStats(runtime, shards.at(0), pathId.LocalPathId); + auto stats = GetTableStats(runtime, shard1, tableId1.PathId.LocalPathId); auto dataSizeHistogram = ReadHistogram(stats.GetDataSizeHistogram()); TVector> expectedDataSizeHistogram = {{475, 7145}, {950, 14290}, {1425, 21435}, {1900, 28580}}; @@ -287,9 +272,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) { .ExternalPoolKind = "ext", .DataThreshold = 100u, .ExternalThreshold = 200u}, {.Name = "hdd", .DataPoolKind = "hdd"}}); CreateShardedTable(server, sender, "/Root", "table-1", opts); - - auto shards = GetTableShards(server, sender, "/Root/table-1"); - UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); TString smallValue(150, 'S'); TString largeValue(1500, 'L'); @@ -300,24 +284,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) { "(4, \"" + largeValue + "\", \"BBB\"), " + "(5, \"CCC\", \"" + largeValue + "\")"); - TPathId pathId; { Cerr << "... waiting for stats after upsert" << Endl; auto stats = WaitTableStats(runtime); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 4232u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u); - pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId()); } - CompactTable(runtime, shards.at(0), pathId); + CompactTable(runtime, shard1, tableId1, false); { Cerr << "... waiting for stats after compaction" << Endl; - auto stats = WaitTableStats(runtime, /* minPartCount */ 1); - UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0)); + auto stats = WaitTableStats(runtime, 1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u); UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 3555u); @@ -336,6 +318,56 @@ Y_UNIT_TEST_SUITE(DataShardStats) { } } + Y_UNIT_TEST(SharedCacheGarbage) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE); + + InitRoot(server, sender); + + auto opts = TShardedTableOptions() + .Shards(1) + .Columns({ + {"key", "Uint32", true, false}, + {"value", "String", true, false}}); + CreateShardedTable(server, sender, "/Root", "table-1", opts); + const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); + const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1"); + + const int batches = 10; + const int batchItems = 10; + for (auto batch : xrange(batches)) { + TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES "; + for (auto item = 0; item < batchItems; item++) { + if (item != 0) + query += ", "; + query += "(0, \"" + TString(7000, 'x') + ToString(batch * batchItems + item) + "\") "; + } + Cerr << query << Endl << Endl; + ExecSQL(server, sender, query); + CompactTable(runtime, shard1, tableId1, false); + + Cerr << "... waiting for stats after compaction" << Endl; + auto stats = WaitTableStats(runtime, 1, (batch + 1) * batchItems); + UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems); + UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1); + } + + // each batch ~70KB, ~700KB in total + auto counters = MakeIntrusive(runtime.GetDynamicCounters()); + Cerr << "ActiveBytes = " << counters->ActiveBytes->Val() << " PassiveBytes = " << counters->PassiveBytes->Val() << Endl; + UNIT_ASSERT_LE(counters->ActiveBytes->Val(), 800*1024); // one index + } + } // Y_UNIT_TEST_SUITE(DataShardStats) } // namespace NKikimr