Skip to content

Commit

Permalink
Merge 1ab6050 into cac1a93
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored May 16, 2024
2 parents cac1a93 + 1ab6050 commit 24df12d
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 44 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ ydb/core/persqueue/ut TPQTest.*DirectRead*
ydb/core/persqueue/ut TopicAutoscaling.PartitionSplit_ManySession_NewSDK
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/datashard/ut_build_index TTxDataShardBuildIndexScan::ShadowBorrowCompaction
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst*
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/kqp/proxy_service/kqp_proxy_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@ Y_UNIT_TEST_SUITE(KqpProxy) {

auto scheduledEvs = [&](TTestActorRuntimeBase& run, TAutoPtr<IEventHandle> &event, TDuration delay, TInstant &deadline) {
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
TActorId actorId = event->GetRecipientRewrite();
IActor *actor = runtime->FindActor(actorId);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_COMPILE_ACTOR) {
Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(event->GetRecipientRewrite()) == "KQP_COMPILE_ACTOR") {
Cerr << "Captured scheduled event for compile actor " << event->Recipient << Endl;
scheduled.push_back(event.Release());
return true;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2365,8 +2365,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {

auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvTxProxySchemeCache::TEvNavigateKeySetResult::EventType) {
IActor* actor = runtime->FindActor(ev->GetRecipientRewrite());
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {
Cerr << "Captured TEvTxProxySchemeCache::TEvNavigateKeySetResult from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->GetRecipientRewrite()) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!firstAttemptToGetData) {
// capture response from scheme cache until CA calls GetAsyncInputData()
captured.push_back(ev.Release());
Expand Down
5 changes: 2 additions & 3 deletions ydb/core/persqueue/ut/common/pq_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ struct TTestContext {

static bool RequestTimeoutFilter(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event, TDuration duration, TInstant& deadline) {
if (event->GetTypeRewrite() == TEvents::TSystem::Wakeup) {
TActorId actorId = event->GetRecipientRewrite();
IActor *actor = runtime.FindActor(actorId);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::PERSQUEUE_ANS_ACTOR) {
Cerr << "Captured TEvents::TSystem::Wakeup from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->GetRecipientRewrite()) == "PERSQUEUE_ANS_ACTOR") {
return true;
}
}
Expand Down
31 changes: 8 additions & 23 deletions ydb/core/tx/datashard/datashard_ut_build_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false);

auto observer = runtime.AddObserver<TEvDataShard::TEvCompactBorrowed>([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) {
IActor *actor = runtime.FindActor(event->Sender);
if (actor && actor->GetActivityType() == 186) {
Cerr << "Ignore SchemeShard TEvCompactBorrowed from " << event->Sender << "(" << actor->GetActivityType() << ")" << " to " << event->Recipient << Endl;
Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") {
event.Reset();
}
});
Expand Down Expand Up @@ -203,12 +202,9 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {

UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), shardIndex == 0 ? 2 : 3);

const auto& ownersProto = stats.GetUserTablePartOwners();
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
THashSet<ui64> owners(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
// Note: datashard always adds current shard to part owners, even if there are no parts
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 2u);
UNIT_ASSERT(owners.contains(shards1.at(0)));
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards1.at(0), shards2.at(shardIndex)}));

auto tableId = ResolveTableId(server, sender, "/Root/table-2");
auto result = CompactBorrowed(runtime, shards2.at(shardIndex), tableId);
Expand All @@ -217,23 +213,12 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) {
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetOwnerId(), tableId.PathId.OwnerId);
UNIT_ASSERT_VALUES_EQUAL(result.GetPathId().GetLocalId(), tableId.PathId.LocalPathId);

for (int i = 0; i < 5; ++i) {
for (int i = 0; i < 5 && (owners.size() > 1 || owners.contains(shards1.at(0))); ++i) {
auto stats = WaitTableStats(runtime, shards2.at(shardIndex));
// Cerr << "Received shard stats:" << Endl << stats.DebugString() << Endl;
const auto& ownersProto = stats.GetUserTablePartOwners();
THashSet<ui64> owners(ownersProto.begin(), ownersProto.end());
if (i < 4) {
if (owners.size() > 1) {
continue;
}
if (owners.contains(shards1.at(0))) {
continue;
}
}
UNIT_ASSERT_VALUES_EQUAL(owners.size(), 1u);
UNIT_ASSERT(owners.contains(shards2.at(shardIndex)));
Cerr << "OK " << shards2.at(shardIndex) << Endl;
owners = THashSet<ui64>(stats.GetUserTablePartOwners().begin(), stats.GetUserTablePartOwners().end());
}

UNIT_ASSERT_VALUES_EQUAL(owners, (THashSet<ui64>{shards2.at(shardIndex)}));
}

// Alter table: disable shadow data and change compaction policy
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tx/datashard/datashard_ut_kqp_stream_lookup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
bool readReceived = false;
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
IActor* actor = runtime->FindActor(ev->Sender);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {

Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!readReceived) {
auto senderSplit = runtime->AllocateEdgeActor();
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
Expand Down Expand Up @@ -109,9 +108,8 @@ Y_UNIT_TEST_SUITE(KqpStreamLookup) {
bool readReceived = false;
auto captureEvents = [&](TTestActorRuntimeBase &, TAutoPtr <IEventHandle> &ev) {
if (ev->GetTypeRewrite() == TEvDataShard::TEvRead::EventType) {
IActor* actor = runtime->FindActor(ev->Sender);
if (actor && actor->GetActivityType() == NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR) {

Cerr << "Captured TEvDataShard::TEvRead from " << runtime->FindActorName(ev->Sender) << " to " << runtime->FindActorName(ev->GetRecipientRewrite()) << Endl;
if (runtime->FindActorName(ev->Sender) == "KQP_STREAM_LOOKUP_ACTOR") {
if (!readReceived) {
auto senderSplit = runtime->AllocateEdgeActor();
ui64 txId = AsyncSplitTable(server, senderSplit, "/Root/TestTable", shards[0], 500);
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,8 @@ Y_UNIT_TEST_SUITE(DataShardStats) {

bool captured = false;
auto observer = runtime.AddObserver<NSharedCache::TEvResult>([&](NSharedCache::TEvResult::TPtr& event) {
IActor *actor = runtime.FindActor(event->Recipient);

Cerr << "Got SchemeShard NSharedCache::TEvResult from " << event->Sender << " to " << event->Recipient << "(" << actor->GetActivityType() << ")"<< Endl;

if (actor && actor->GetActivityType() == 288) {
Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl;
if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") {
auto& message = *event->Get();
event.Reset(static_cast<TEventHandle<NSharedCache::TEvResult> *>(
new IEventHandle(event->Recipient, event->Sender,
Expand All @@ -455,6 +452,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
options.CustomFinalCondition = [&]() { return captured; };
runtime.DispatchEvents(options, TDuration::Seconds(5));
}
UNIT_ASSERT(captured);
observer.Remove();

{
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/ut_common/datashard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,7 @@ NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runt
auto observer = runtime.AddObserver<TEvDataShard::TEvPeriodicTableStats>([&](auto& ev) {
const auto& record = ev->Get()->Record;
if (record.GetDatashardId() == tabletId) {
Cout << "Got TEvPeriodicTableStats record: PartCount=" << record.GetTableStats().GetPartCount() << ", RowCount=" << record.GetTableStats().GetRowCount() << Endl;
Cerr << "Captured TEvDataShard::TEvPeriodicTableStats " << record.ShortDebugString() << Endl;
if (record.GetTableStats().GetPartCount() >= minPartCount && record.GetTableStats().GetRowCount() >= minRows) {
stats = record;
captured = true;
Expand Down
8 changes: 8 additions & 0 deletions ydb/library/actors/testlib/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,14 @@ namespace NActors {
return FindActor(actorId, node);
}

TStringBuf TTestActorRuntimeBase::FindActorName(const TActorId& actorId, ui32 nodeIndex) const {
auto actor = FindActor(actorId, nodeIndex);
if (!actor) {
return {};
}
return TLocalProcessKeyState<TActorActivityTag>::GetInstance().GetNameByIndex(actor->GetActivityType());
}

void TTestActorRuntimeBase::EnableScheduleForActor(const TActorId& actorId, bool allow) {
TGuard<TMutex> guard(Mutex);
if (allow) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/testlib/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ namespace NActors {
TActorId GetInterconnectProxy(ui32 nodeIndexFrom, ui32 nodeIndexTo);
void BlockOutputForActor(const TActorId& actorId);
IActor* FindActor(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
TStringBuf FindActorName(const TActorId& actorId, ui32 nodeIndex = Max<ui32>()) const;
void EnableScheduleForActor(const TActorId& actorId, bool allow = true);
bool IsScheduleForActorEnabled(const TActorId& actorId) const;
TIntrusivePtr<NMonitoring::TDynamicCounters> GetDynamicCounters(ui32 nodeIndex = 0);
Expand Down

0 comments on commit 24df12d

Please sign in to comment.