Skip to content

Commit

Permalink
KIKIMR-19878 Fix missing shared cache unregister
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga committed Nov 1, 2023
1 parent 7b1c4ab commit a18f545
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 46 deletions.
1 change: 1 addition & 0 deletions ydb/core/testlib/basics/services.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ namespace NPDisk {
pageCollectionCacheConfig->CacheConfig = new TCacheCacheConfig(caches.Shared, nullptr, nullptr, nullptr);
pageCollectionCacheConfig->TotalAsyncQueueInFlyLimit = caches.AsyncQueue;
pageCollectionCacheConfig->TotalScanQueueInFlyLimit = caches.ScanQueue;
pageCollectionCacheConfig->Counters = MakeIntrusive<TSharedPageCacheCounters>(runtime.GetDynamicCounters(nodeIndex));

runtime.AddLocalService(MakeSharedPageCacheId(0),
TActorSetupCmd(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard__stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class TAsyncTableStatsBuilder : public TActorBootstrapped<TAsyncTableStatsBuilde
private:
void Die(const TActorContext& ctx) override {
ctx.Send(MakeResourceBrokerID(), new TEvResourceBroker::TEvNotifyActorDied);
ctx.Send(MakeSharedPageCacheId(), new NSharedCache::TEvUnregister);
TActorBootstrapped::Die(ctx);
}

Expand Down
124 changes: 78 additions & 46 deletions ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "datashard_ut_common.h"
#include "datashard_ut_common_kqp.h"
#include "ydb/core/tablet_flat/shared_sausagecache.h"

namespace NKikimr {

Expand All @@ -9,15 +10,15 @@ using namespace Tests;

Y_UNIT_TEST_SUITE(DataShardStats) {

NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, size_t minPartCount = 0) {
NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, size_t minPartCount = 0, size_t minRows = 0) {
NKikimrTxDataShard::TEvPeriodicTableStats stats;
bool captured = false;

auto observerFunc = [&](TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
case TEvDataShard::TEvPeriodicTableStats::EventType: {
stats = ev->Get<TEvDataShard::TEvPeriodicTableStats>()->Record;
if (stats.GetTableStats().GetPartCount() >= minPartCount) {
if (stats.GetTableStats().GetPartCount() >= minPartCount && stats.GetTableStats().GetRowCount() >= minRows) {
captured = true;
}
break;
Expand All @@ -44,15 +45,6 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
return stats;
}

NKikimrTxDataShard::TEvCompactTableResult CompactTable(TTestActorRuntime& runtime, ui64 tabletId, const TPathId& pathId) {
auto sender = runtime.AllocateEdgeActor();
auto request = MakeHolder<TEvDataShard::TEvCompactTable>(pathId.OwnerId, pathId.LocalPathId);
runtime.SendToPipe(tabletId, sender, request.Release(), 0, GetPipeConfigWithRetries());

auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
return ev->Get()->Record;
}

NKikimrTableStats::TTableStats GetTableStats(TTestActorRuntime& runtime, ui64 tabletId, ui64 tableId) {
auto sender = runtime.AllocateEdgeActor();
auto request = MakeHolder<TEvDataShard::TEvGetTableStats>(tableId);
Expand Down Expand Up @@ -88,29 +80,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");

TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 704u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}

CompactTable(runtime, shards.at(0), pathId);
CompactTable(runtime, shard1, tableId1, false);

{
Cerr << "... waiting for stats after compaction" << Endl;
auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
auto stats = WaitTableStats(runtime, 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 65u);
Expand Down Expand Up @@ -144,30 +134,27 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
.Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}, {"value2", "Uint32", false, false, "hdd"}})
.Families({{.Name = "default", .LogPoolKind = "ssd", .SysLogPoolKind = "ssd", .DataPoolKind = "ssd"}, {.Name = "hdd", .DataPoolKind = "hdd"}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);

auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value, value2) VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3)");

TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 752u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}

CompactTable(runtime, shards.at(0), pathId);
CompactTable(runtime, shard1, tableId1, false);

{
Cerr << "... waiting for stats after compaction" << Endl;
auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
auto stats = WaitTableStats(runtime, 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 115u);
Expand Down Expand Up @@ -203,8 +190,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");

const int count = 2000;
TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
Expand All @@ -215,24 +202,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}
ExecSQL(server, sender, query);

TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 196096u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}

CompactTable(runtime, shards.at(0), pathId);
CompactTable(runtime, shard1, tableId1, false);

{
Cerr << "... waiting for stats after compaction" << Endl;
auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
auto stats = WaitTableStats(runtime, 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), count);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 30100u);
Expand All @@ -244,7 +229,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}

{
auto stats = GetTableStats(runtime, shards.at(0), pathId.LocalPathId);
auto stats = GetTableStats(runtime, shard1, tableId1.PathId.LocalPathId);

auto dataSizeHistogram = ReadHistogram(stats.GetDataSizeHistogram());
TVector<std::pair<ui64, ui64>> expectedDataSizeHistogram = {{475, 7145}, {950, 14290}, {1425, 21435}, {1900, 28580}};
Expand Down Expand Up @@ -287,9 +272,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
.ExternalPoolKind = "ext", .DataThreshold = 100u, .ExternalThreshold = 200u},
{.Name = "hdd", .DataPoolKind = "hdd"}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);

auto shards = GetTableShards(server, sender, "/Root/table-1");
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");

TString smallValue(150, 'S');
TString largeValue(1500, 'L');
Expand All @@ -300,24 +284,22 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
"(4, \"" + largeValue + "\", \"BBB\"), " +
"(5, \"CCC\", \"" + largeValue + "\")");

TPathId pathId;
{
Cerr << "... waiting for stats after upsert" << Endl;
auto stats = WaitTableStats(runtime);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 0u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 4232u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetIndexSize(), 0u);
pathId = TPathId(stats.GetTableOwnerId(), stats.GetTableLocalId());
}

CompactTable(runtime, shards.at(0), pathId);
CompactTable(runtime, shard1, tableId1, false);

{
Cerr << "... waiting for stats after compaction" << Endl;
auto stats = WaitTableStats(runtime, /* minPartCount */ 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shards.at(0));
auto stats = WaitTableStats(runtime, 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 5u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1u);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetDataSize(), 3555u);
Expand All @@ -336,6 +318,56 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
}
}

Y_UNIT_TEST(SharedCacheGarbage) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

TServer::TPtr server = new TServer(serverSettings);
auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);

InitRoot(server, sender);

auto opts = TShardedTableOptions()
.Shards(1)
.Columns({
{"key", "Uint32", true, false},
{"value", "String", true, false}});
CreateShardedTable(server, sender, "/Root", "table-1", opts);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);
const auto tableId1 = ResolveTableId(server, sender, "/Root/table-1");

const int batches = 10;
const int batchItems = 10;
for (auto batch : xrange(batches)) {
TString query = "UPSERT INTO `/Root/table-1` (key, value) VALUES ";
for (auto item = 0; item < batchItems; item++) {
if (item != 0)
query += ", ";
query += "(0, \"" + TString(7000, 'x') + ToString(batch * batchItems + item) + "\") ";
}
Cerr << query << Endl << Endl;
ExecSQL(server, sender, query);
CompactTable(runtime, shard1, tableId1, false);

Cerr << "... waiting for stats after compaction" << Endl;
auto stats = WaitTableStats(runtime, 1, (batch + 1) * batchItems);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), (batch + 1) * batchItems);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
}

// each batch ~70KB, ~700KB in total
auto counters = MakeIntrusive<TSharedPageCacheCounters>(runtime.GetDynamicCounters());
Cerr << "ActiveBytes = " << counters->ActiveBytes->Val() << " PassiveBytes = " << counters->PassiveBytes->Val() << Endl;
UNIT_ASSERT_LE(counters->ActiveBytes->Val(), 800*1024); // one index
}

} // Y_UNIT_TEST_SUITE(DataShardStats)

} // namespace NKikimr

0 comments on commit a18f545

Please sign in to comment.