diff --git a/ydb/core/testlib/actors/block_events.h b/ydb/core/testlib/actors/block_events.h index 2b76acf54eca..a267d4f3cc33 100644 --- a/ydb/core/testlib/actors/block_events.h +++ b/ydb/core/testlib/actors/block_events.h @@ -27,7 +27,7 @@ namespace NActors { * Unblocks up to count events at the front of the deque, allowing them * to be handled by the destination actor. */ - TBlockEvents& Unblock(size_t count = -1) { + TBlockEvents& Unblock(size_t count = Max()) { while (!this->empty() && count > 0) { auto& ev = this->front(); if (!Stopped) { @@ -36,6 +36,7 @@ namespace NActors { } ui32 nodeId = ev->GetRecipientRewrite().NodeId(); ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId(); + Cerr << "TBlockEvents::Unblock " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true); this->pop_front(); --count; @@ -67,6 +68,7 @@ namespace NActors { return; } + Cerr << "TBlockEvents::Block " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; this->emplace_back(std::move(ev)); } diff --git a/ydb/core/tx/datashard/datashard_ut_build_index.cpp b/ydb/core/tx/datashard/datashard_ut_build_index.cpp index 35cb7139e6d6..243721d8a2d2 100644 --- a/ydb/core/tx/datashard/datashard_ut_build_index.cpp +++ b/ydb/core/tx/datashard/datashard_ut_build_index.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -167,11 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false); - auto observer = runtime.AddObserver([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { - Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; - if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") { - event.Reset(); - } + TBlockEvents block(runtime, [&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { + return runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR"; }); auto snapshot = CreateVolatileSnapshot(server, { "/Root/table-1" }); diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp index 50f64b8c231c..e5b17cef8c69 100644 --- a/ydb/core/tx/datashard/datashard_ut_stats.cpp +++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp @@ -1,6 +1,7 @@ #include #include #include +#include namespace NKikimr { @@ -441,28 +442,16 @@ Y_UNIT_TEST_SUITE(DataShardStats) { const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); UpsertRows(server, sender); - - bool captured = false; - auto observer = runtime.AddObserver([&](NSharedCache::TEvResult::TPtr& event) { - Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; - if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") { - auto& message = *event->Get(); - event.Reset(static_cast *>( - new IEventHandle(event->Recipient, event->Sender, - new NSharedCache::TEvResult(message.Origin, message.Cookie, NKikimrProto::NODATA)))); - captured = true; - } + + TBlockEvents block(runtime, [&](NSharedCache::TEvResult::TPtr& event) { + return runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER"; }); CompactTable(runtime, shard1, tableId1, false); - for (int i = 0; i < 5 && !captured; ++i) { - TDispatchOptions options; - options.CustomFinalCondition = [&]() { return captured; }; - runtime.DispatchEvents(options, TDuration::Seconds(5)); - } - UNIT_ASSERT(captured); - observer.Remove(); + runtime.WaitFor("blocked read", [&]{ return block.size(); }); + + block.Stop().Unblock(); { Cerr << "Waiting stats.." << Endl;