From 61f79a29b36e99f132467d18f1079f87d9a3a058 Mon Sep 17 00:00:00 2001 From: kungasc Date: Thu, 16 May 2024 15:18:18 +0000 Subject: [PATCH 1/4] Fix incorrect actor->GetActivityType() usages in tests --- .github/config/muted_ya.txt | 1 - ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 5 ++- ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 4 +-- ydb/core/persqueue/ut/common/pq_ut_common.h | 5 ++- .../tx/datashard/datashard_ut_build_index.cpp | 31 +++++-------------- .../datashard_ut_kqp_stream_lookup.cpp | 10 +++--- ydb/core/tx/datashard/datashard_ut_stats.cpp | 8 ++--- .../ut_common/datashard_ut_common.cpp | 2 +- ydb/library/actors/testlib/test_runtime.cpp | 8 +++++ ydb/library/actors/testlib/test_runtime.h | 1 + 10 files changed, 31 insertions(+), 44 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 8293f1e40617..41651968e39b 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -31,7 +31,6 @@ ydb/core/persqueue/ut TPQTest.*DirectRead* ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_ManySession_NewSDK ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium -ydb/core/tx/datashard/ut_build_index TTxDataShardBuildIndexScan.ShadowBorrowCompaction ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst* diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index e88af0455f22..43a75aa8b355 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -185,9 +185,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) { auto scheduledEvs = [&](TTestActorRuntimeBase& run, TAutoPtr &event, TDuration delay, TInstant &deadline) { if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) { - TActorId actorId = event->GetRecipientRewrite(); - IActor *actor = runtime->FindActor(actorId); - if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_COMPILE_ACTOR) { + Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + if (runtime->FindActorName(event->GetRecipientRewrite()) == "KQP_COMPILE_ACTOR") { Cerr << "Captured scheduled event for compile actor " << event->Recipient << Endl; scheduled.push_back(event.Release()); return true; diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 6a976f4b889c..689c9e1e39e9 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2365,8 +2365,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr& ev) { if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) { - IActor* actor = runtime->FindActor(ev->GetRecipientRewrite()); - if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { + Cerr << "Captured TEvTxProxySchemeCache::TEvNavigateKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") { if (!firstAttemptToGetData) { // capture response from scheme cache until CA calls GetAsyncInputData() captured.push_back(ev.Release()); diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index ce2336cd66c2..ea195c5ebcd7 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -127,9 +127,8 @@ struct TTestContext { static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr& event, TDuration duration, TInstant& deadline) { if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) { - TActorId actorId = event->GetRecipientRewrite(); - IActor *actor = runtime.FindActor(actorId); - if (actor && actor->GetActivityType() == NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR) { + Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; + if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") { return true; } } diff --git a/ydb/core/tx/datashard/datashard_ut_build_index.cpp b/ydb/core/tx/datashard/datashard_ut_build_index.cpp index b3fb34a08dbb..35cb7139e6d6 100644 --- a/ydb/core/tx/datashard/datashard_ut_build_index.cpp +++ b/ydb/core/tx/datashard/datashard_ut_build_index.cpp @@ -168,9 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false); auto observer = runtime.AddObserver([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { - IActor *actor = runtime.FindActor(event->Sender); - if (actor && actor->GetActivityType() == 186) { - Cerr << "Ignore SchemeShard TEvCompactBorrowed from " << event->Sender << "(" << actor->GetActivityType() << ")" << " to " << event->Recipient << Endl; + Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; + if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") { event.Reset(); } }); @@ -203,12 +202,9 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3); - const auto& ownersProto = stats.GetUserTablePartOwners(); - THashSet owners(ownersProto.begin(), ownersProto.end()); + THashSet owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end()); // Note: datashard always adds current shard to part owners, even if there are no parts - UNIT_ASSERT_VALUES_EQUAL(owners.size(), 2u); - UNIT_ASSERT(owners.contains(shards1.at(0))); - UNIT_ASSERT(owners.contains(shards2.at(shardIndex))); + UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet{shards1.at(0), shards2.at(shardIndex)})); auto tableId = ResolveTableId(server, sender, "/Root/table-2"); auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId); @@ -217,23 +213,12 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId); UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId); - for (int i = 0; i < 5; ++i) { + for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) { auto stats = WaitTableStats(runtime, shards2.at(shardIndex)); - // Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl; - const auto& ownersProto = stats.GetUserTablePartOwners(); - THashSet owners(ownersProto.begin(), ownersProto.end()); - if (i < 4) { - if (owners.size() > 1) { - continue; - } - if (owners.contains(shards1.at(0))) { - continue; - } - } - UNIT_ASSERT_VALUES_EQUAL(owners.size(), 1u); - UNIT_ASSERT(owners.contains(shards2.at(shardIndex))); - Cerr << "OK " << shards2.at(shardIndex) << Endl; + owners = THashSet(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end()); } + + UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet{shards2.at(shardIndex)})); } // Alter table: disable shadow data and change compaction policy diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp index 8bfca964ff61..45541f5e60d2 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp @@ -41,9 +41,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) { bool readReceived = false; auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr &ev) { if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) { - IActor* actor = runtime->FindActor(ev->Sender); - if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { - + Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") { if (!readReceived) { auto senderSplit = runtime->AllocateEdgeActor(); ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500); @@ -109,9 +108,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) { bool readReceived = false; auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr &ev) { if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) { - IActor* actor = runtime->FindActor(ev->Sender); - if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) { - + Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") { if (!readReceived) { auto senderSplit = runtime->AllocateEdgeActor(); ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500); diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp index 511176633c85..1737123b5f5a 100644 --- a/ydb/core/tx/datashard/datashard_ut_stats.cpp +++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp @@ -435,11 +435,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) { bool captured = false; auto observer = runtime.AddObserver([&](NSharedCache::TEvResult::TPtr& event) { - IActor *actor = runtime.FindActor(event->Recipient); - - Cerr << "Got SchemeShard NSharedCache::TEvResult from " << event->Sender << " to " << event->Recipient << "(" << actor->GetActivityType() << ")"<< Endl; - - if (actor && actor->GetActivityType() == 288) { + Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; + if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") { auto& message = *event->Get(); event.Reset(static_cast *>( new IEventHandle(event->Recipient, event->Sender, @@ -455,6 +452,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { options.CustomFinalCondition = [&]() { return captured; }; runtime.DispatchEvents(options, TDuration::Seconds(5)); } + UNIT_ASSERT(captured); observer.Remove(); { diff --git a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp index e21f402b54fd..7473631d6890 100644 --- a/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp @@ -1754,7 +1754,7 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt auto observer = runtime.AddObserver([&](auto& ev) { const auto& record = ev->Get()->Record; if (record.GetDatashardId() == tabletId) { - Cout << "Got TEvPeriodicTableStats record: PartCount=" << record.GetTableStats().GetPartCount() << ", RowCount=" << record.GetTableStats().GetRowCount() << Endl; + Cerr << "Captured TEvDataShard::TEvPeriodicTableStats " << record.ShortDebugString() << Endl; if (record.GetTableStats().GetPartCount() >= minPartCount && record.GetTableStats().GetRowCount() >= minRows) { stats = record; captured = true; diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index c64a332c2b3e..200690ad98fc 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -1558,6 +1558,14 @@ namespace NActors { return FindActor(actorId, node); } + TStringBuf TTestActorRuntimeBase::FindActorName(const TActorId& actorId, ui32 nodeIndex) const { + auto actor = FindActor(actorId, nodeIndex); + if (!actor) { + return {}; + } + return TLocalProcessKeyState::GetInstance().GetNameByIndex(actor->GetActivityType()); + } + void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) { TGuard guard(Mutex); if (allow) { diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index bc8343863087..d6b20ca9a357 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -291,6 +291,7 @@ namespace NActors { TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo); void BlockOutputForActor(const TActorId& actorId); IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max()) const; + TStringBuf FindActorName(const TActorId& actorId, ui32 nodeIndex = Max()) const; void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr GetDynamicCounters(ui32 nodeIndex = 0); From e41cfa2613e55f7d0c39f5f082b5cba6bb357d22 Mon Sep 17 00:00:00 2001 From: kungasc Date: Thu, 16 May 2024 16:41:46 +0000 Subject: [PATCH 2/4] fix build --- ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp index 43a75aa8b355..883ec7d9198e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp @@ -185,7 +185,7 @@ Y_UNIT_TEST_SUITE(KqpProxy) { auto scheduledEvs = [&](TTestActorRuntimeBase& run, TAutoPtr &event, TDuration delay, TInstant &deadline) { if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) { - Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime->FindActorName(event->GetRecipientRewrite()) << Endl; if (runtime->FindActorName(event->GetRecipientRewrite()) == "KQP_COMPILE_ACTOR") { Cerr << "Captured scheduled event for compile actor " << event->Recipient << Endl; scheduled.push_back(event.Release()); From a7d90e19720bb83eb91c6560e45ac475ed53eda3 Mon Sep 17 00:00:00 2001 From: kungasc Date: Thu, 16 May 2024 18:25:14 +0000 Subject: [PATCH 3/4] fix kqp scan test --- ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 689c9e1e39e9..e8bb6f3034b5 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2364,8 +2364,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { bool firstAttemptToGetData = false; auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr& ev) { - if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) { - Cerr << "Captured TEvTxProxySchemeCache::TEvNavigateKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; + if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) { + Cerr << "Captured TEvTxProxySchemeCache::TEvResolveKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl; if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") { if (!firstAttemptToGetData) { // capture response from scheme cache until CA calls GetAsyncInputData() @@ -2428,13 +2428,13 @@ Y_UNIT_TEST_SUITE(KqpScan) { createTable(createSession(), R"( --!syntax_v1 - CREATE TABLE `/Root/Table` (Key int32, Value int32, PRIMARY KEY(Key)); + CREATE TABLE `/Root/Table` (Key int32, Fk int32, Value int32, PRIMARY KEY(Key), INDEX Index GLOBAL ON (Fk)); )"); server->GetRuntime()->SetEventFilter(captureEvents); sendQuery(R"( - SELECT Value FROM `/Root/Table` WHERE Key IN AsList(1, 2, 3); + SELECT Value FROM `/Root/Table` VIEW Index WHERE Fk IN AsList(1, 2, 3); )"); } From 3f16349e26bf6103dd64fa7503c9426d0bc96283 Mon Sep 17 00:00:00 2001 From: kungasc Date: Thu, 16 May 2024 21:36:29 +0000 Subject: [PATCH 4/4] fix --- ydb/core/persqueue/ut/common/pq_ut_common.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/persqueue/ut/common/pq_ut_common.h b/ydb/core/persqueue/ut/common/pq_ut_common.h index ea195c5ebcd7..876bac14146d 100644 --- a/ydb/core/persqueue/ut/common/pq_ut_common.h +++ b/ydb/core/persqueue/ut/common/pq_ut_common.h @@ -127,7 +127,7 @@ struct TTestContext { static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr& event, TDuration duration, TInstant& deadline) { if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) { - Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; + Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") { return true; }