Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect actor->GetActivityType() usages in tests #4620

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ ydb/core/persqueue/ut TPQTest.*DirectRead*
ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_ManySession_NewSDK
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/datashard/ut_build_index TTxDataShardBuildIndexScan.ShadowBorrowCompaction
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst*
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) {

auto scheduledEvs = [&](TTestActorRuntimeBase& run, TAutoPtr<IEventHandle> &event, TDuration delay, TInstant &deadline) {
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
TActorId actorId = event->GetRecipientRewrite();
IActor *actor = runtime->FindActor(actorId);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_COMPILE_ACTOR) {
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime->FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(event->GetRecipientRewrite()) == "KQP_COMPILE_ACTOR") {
Cerr << "Captured scheduled event for compile actor " << event->Recipient << Endl;
scheduled.push_back(event.Release());
return true;
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2364,9 +2364,9 @@ Y_UNIT_TEST_SUITE(KqpScan) {
bool firstAttemptToGetData = false;

auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) {
IActor* actor = runtime->FindActor(ev->GetRecipientRewrite());
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvResolveKeySetResult::EventType) {
Cerr << "Captured TEvTxProxySchemeCache::TEvResolveKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!firstAttemptToGetData) {
// capture response from scheme cache until CA calls GetAsyncInputData()
captured.push_back(ev.Release());
Expand Down Expand Up @@ -2428,13 +2428,13 @@ Y_UNIT_TEST_SUITE(KqpScan) {

createTable(createSession(), R"(
--!syntax_v1
CREATE TABLE `/Root/Table` (Key int32, Value int32, PRIMARY KEY(Key));
CREATE TABLE `/Root/Table` (Key int32, Fk int32, Value int32, PRIMARY KEY(Key), INDEX Index GLOBAL ON (Fk));
)");

server->GetRuntime()->SetEventFilter(captureEvents);

sendQuery(R"(
SELECT Value FROM `/Root/Table` WHERE Key IN AsList(1, 2, 3);
SELECT Value FROM `/Root/Table` VIEW Index WHERE Fk IN AsList(1, 2, 3);
)");
}

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ struct TTestContext {

static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
TActorId actorId = event->GetRecipientRewrite();
IActor *actor = runtime.FindActor(actorId);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR) {
Cerr << "Captured TEvents::TSystem::Wakeup to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
return true;
}
}
Expand Down
31 changes: 8 additions & 23 deletions ydb/core/tx/datashard/datashard_ut_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false);

auto observer = runtime.AddObserver<TEvDataShard::TEvCompactBorrowed>([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) {
IActor *actor = runtime.FindActor(event->Sender);
if (actor && actor->GetActivityType() == 186) {
Cerr << "Ignore SchemeShard TEvCompactBorrowed from " << event->Sender << "(" << actor->GetActivityType() << ")" << " to " << event->Recipient << Endl;
Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") {
event.Reset();
}
});
Expand Down Expand Up @@ -203,12 +202,9 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {

UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3);

const auto& ownersProto = stats.GetUserTablePartOwners();
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
THashSet<ui64> owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
// Note: datashard always adds current shard to part owners, even if there are no parts
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 2u);
UNIT_ASSERT(owners.contains(shards1.at(0)));
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards1.at(0), shards2.at(shardIndex)}));

auto tableId = ResolveTableId(server, sender, "/Root/table-2");
auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId);
Expand All @@ -217,23 +213,12 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId);
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId);

for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) {
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
// Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl;
const auto& ownersProto = stats.GetUserTablePartOwners();
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
if (i < 4) {
if (owners.size() > 1) {
continue;
}
if (owners.contains(shards1.at(0))) {
continue;
}
}
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 1u);
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
Cerr << "OK " << shards2.at(shardIndex) << Endl;
owners = THashSet<ui64>(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
}

UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards2.at(shardIndex)}));
}

// Alter table: disable shadow data and change compaction policy
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
bool readReceived = false;
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
IActor* actor = runtime->FindActor(ev->Sender);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {

Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!readReceived) {
auto senderSplit = runtime->AllocateEdgeActor();
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
Expand Down Expand Up @@ -109,9 +108,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
bool readReceived = false;
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
IActor* actor = runtime->FindActor(ev->Sender);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {

Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!readReceived) {
auto senderSplit = runtime->AllocateEdgeActor();
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {

bool captured = false;
auto observer = runtime.AddObserver<NSharedCache::TEvResult>([&](NSharedCache::TEvResult::TPtr& event) {
IActor *actor = runtime.FindActor(event->Recipient);

Cerr << "Got SchemeShard NSharedCache::TEvResult from " << event->Sender << " to " << event->Recipient << "(" << actor->GetActivityType() << ")"<< Endl;

if (actor && actor->GetActivityType() == 288) {
Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") {
auto& message = *event->Get();
event.Reset(static_cast<TEventHandle<NSharedCache::TEvResult> *>(
new IEventHandle(event->Recipient, event->Sender,
Expand All @@ -455,6 +452,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
options.CustomFinalCondition = [&]() { return captured; };
runtime.DispatchEvents(options, TDuration::Seconds(5));
}
UNIT_ASSERT(captured);
observer.Remove();

{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,7 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt
auto observer = runtime.AddObserver<TEvDataShard::TEvPeriodicTableStats>([&](auto& ev) {
const auto& record = ev->Get()->Record;
if (record.GetDatashardId() == tabletId) {
Cout << "Got TEvPeriodicTableStats record: PartCount=" << record.GetTableStats().GetPartCount() << ", RowCount=" << record.GetTableStats().GetRowCount() << Endl;
Cerr << "Captured TEvDataShard::TEvPeriodicTableStats " << record.ShortDebugString() << Endl;
if (record.GetTableStats().GetPartCount() >= minPartCount && record.GetTableStats().GetRowCount() >= minRows) {
stats = record;
captured = true;
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/actors/testlib/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,14 @@ namespace NActors {
return FindActor(actorId, node);
}

TStringBuf TTestActorRuntimeBase::FindActorName(const TActorId& actorId, ui32 nodeIndex) const {
auto actor = FindActor(actorId, nodeIndex);
if (!actor) {
return {};
}
return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(actor->GetActivityType());
}

void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
TGuard<TMutex> guard(Mutex);
if (allow) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/testlib/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ namespace NActors {
TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
void BlockOutputForActor(const TActorId& actorId);
IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
TStringBuf FindActorName(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
bool IsScheduleForActorEnabled(const TActorId& actorId) const;
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
Expand Down
Loading