From ea7da6c192541879f5e5b10044adf845aa09124b Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Wed, 14 Aug 2024 10:31:42 +0300 Subject: [PATCH 1/7] Add common WaitFor helper to the test actor runtime (#7725) --- ydb/core/testlib/actors/test_runtime.h | 32 +++++-- ydb/core/testlib/actors/test_runtime_ut.cpp | 95 +++++++++++++++++++++ 2 files changed, 121 insertions(+), 6 deletions(-) diff --git a/ydb/core/testlib/actors/test_runtime.h b/ydb/core/testlib/actors/test_runtime.h index 3016bc441417..d57fcada8d28 100644 --- a/ydb/core/testlib/actors/test_runtime.h +++ b/ydb/core/testlib/actors/test_runtime.h @@ -68,22 +68,42 @@ namespace NActors { void SimulateSleep(TDuration duration); template - inline TResult WaitFuture(NThreading::TFuture f) { + inline TResult WaitFuture(NThreading::TFuture f, TDuration simTimeout = TDuration::Max()) { if (!f.HasValue() && !f.HasException()) { TDispatchOptions options; options.CustomFinalCondition = [&]() { return f.HasValue() || f.HasException(); }; - options.FinalEvents.emplace_back([&](IEventHandle&) { - return f.HasValue() || f.HasException(); - }); + // Quirk: non-empty FinalEvents enables full simulation + options.FinalEvents.emplace_back([](IEventHandle&) { return false; }); - this->DispatchEvents(options); + this->DispatchEvents(options, simTimeout); Y_ABORT_UNLESS(f.HasValue() || f.HasException()); } - return f.ExtractValue(); + if constexpr (!std::is_same_v) { + return f.ExtractValue(); + } else { + return f.GetValue(); + } + } + + template + inline void WaitFor(const TString& description, const TCondition& condition, TDuration simTimeout = TDuration::Max()) { + if (!condition()) { + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + // Quirk: non-empty FinalEvents enables full simulation + options.FinalEvents.emplace_back([](IEventHandle&) { return false; }); + + Cerr << "... waiting for " << description << Endl; + this->DispatchEvents(options, simTimeout); + + Y_ABORT_UNLESS(condition(), "Timeout while waiting for %s", description.c_str()); + } } TIntrusivePtr GetMemObserver(ui32 nodeIndex = 0) { diff --git a/ydb/core/testlib/actors/test_runtime_ut.cpp b/ydb/core/testlib/actors/test_runtime_ut.cpp index d649df72fc89..931176a45642 100644 --- a/ydb/core/testlib/actors/test_runtime_ut.cpp +++ b/ydb/core/testlib/actors/test_runtime_ut.cpp @@ -622,6 +622,101 @@ Y_UNIT_TEST_SUITE(TActorTest) { UNIT_ASSERT_VALUES_EQUAL(event->Get()->Index, 12u); } } + + Y_UNIT_TEST(TestWaitFuture) { + enum EEv { + EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE) + }; + + struct TEvTrigger : public TEventLocal { + TEvTrigger() = default; + }; + + class TTriggerActor : public TActorBootstrapped { + public: + TTriggerActor(NThreading::TPromise promise) + : Promise(std::move(promise)) + {} + + void Bootstrap() { + Schedule(TDuration::Seconds(1), new TEvTrigger); + Become(&TThis::StateWork); + } + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTrigger, Handle); + } + } + + void Handle(TEvTrigger::TPtr&) { + Promise.SetValue(); + PassAway(); + } + + private: + NThreading::TPromise Promise; + }; + + TTestActorRuntime runtime; + runtime.Initialize(MakeEgg()); + + NThreading::TPromise promise = NThreading::NewPromise(); + NThreading::TFuture future = promise.GetFuture(); + + auto actor = runtime.Register(new TTriggerActor(std::move(promise))); + runtime.EnableScheduleForActor(actor); + + runtime.WaitFuture(std::move(future)); + } + + Y_UNIT_TEST(TestWaitFor) { + enum EEv { + EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE) + }; + + struct TEvTrigger : public TEventLocal { + TEvTrigger() = default; + }; + + class TTriggerActor : public TActorBootstrapped { + public: + TTriggerActor(int* ptr) + : Ptr(ptr) + {} + + void Bootstrap() { + Schedule(TDuration::Seconds(1), new TEvTrigger); + Become(&TThis::StateWork); + } + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTrigger, Handle); + } + } + + void Handle(TEvTrigger::TPtr&) { + *Ptr = 42; + PassAway(); + } + + private: + int* Ptr; + }; + + TTestActorRuntime runtime; + runtime.Initialize(MakeEgg()); + + int value = 0; + auto actor = runtime.Register(new TTriggerActor(&value)); + runtime.EnableScheduleForActor(actor); + + runtime.WaitFor("value = 42", [&]{ return value == 42; }); + UNIT_ASSERT_VALUES_EQUAL(value, 42); + } }; } From 1287219311051b97967808444aac9f80269492ff Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Wed, 14 Aug 2024 14:07:52 +0300 Subject: [PATCH 2/7] Add a common TBlockEvents helper class for event interception (#7767) Co-authored-by: kungurtsev --- ydb/core/testlib/actors/block_events.cpp | 1 + ydb/core/testlib/actors/block_events.h | 82 +++++++++++++ ydb/core/testlib/actors/test_runtime_ut.cpp | 111 +++++++++++++++++- ydb/core/testlib/actors/ya.make | 3 + .../datashard/datashard_ut_read_iterator.cpp | 53 +-------- 5 files changed, 197 insertions(+), 53 deletions(-) create mode 100644 ydb/core/testlib/actors/block_events.cpp create mode 100644 ydb/core/testlib/actors/block_events.h diff --git a/ydb/core/testlib/actors/block_events.cpp b/ydb/core/testlib/actors/block_events.cpp new file mode 100644 index 000000000000..801f1c5bf126 --- /dev/null +++ b/ydb/core/testlib/actors/block_events.cpp @@ -0,0 +1 @@ +#include "block_events.h" diff --git a/ydb/core/testlib/actors/block_events.h b/ydb/core/testlib/actors/block_events.h new file mode 100644 index 000000000000..2b76acf54eca --- /dev/null +++ b/ydb/core/testlib/actors/block_events.h @@ -0,0 +1,82 @@ +#include "test_runtime.h" + +#include +#include + +namespace NActors { + + /** + * Easy blocking for events under the test actor runtime + * + * Matching events are blocked just before they are processed and stashed + * into a deque. + */ + template + class TBlockEvents : public std::deque { + public: + TBlockEvents(TTestActorRuntime& runtime, std::function condition = {}) + : Runtime(runtime) + , Condition(std::move(condition)) + , Holder(Runtime.AddObserver( + [this](typename TEvType::TPtr& ev) { + this->Process(ev); + })) + {} + + /** + * Unblocks up to count events at the front of the deque, allowing them + * to be handled by the destination actor. + */ + TBlockEvents& Unblock(size_t count = -1) { + while (!this->empty() && count > 0) { + auto& ev = this->front(); + if (!Stopped) { + IEventHandle* ptr = ev.Get(); + UnblockedOnce.insert(ptr); + } + ui32 nodeId = ev->GetRecipientRewrite().NodeId(); + ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId(); + Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true); + this->pop_front(); + --count; + } + return *this; + } + + /** + * Stops blocking any new events. Events currently in the deque are + * not unblocked, but may be unblocked at a later time if needed. + */ + TBlockEvents& Stop() { + UnblockedOnce.clear(); + Holder.Remove(); + Stopped = true; + return *this; + } + + private: + void Process(typename TEvType::TPtr& ev) { + IEventHandle* ptr = ev.Get(); + auto it = UnblockedOnce.find(ptr); + if (it != UnblockedOnce.end()) { + UnblockedOnce.erase(it); + return; + } + + if (Condition && !Condition(ev)) { + return; + } + + this->emplace_back(std::move(ev)); + } + + private: + TTestActorRuntime& Runtime; + std::function Condition; + TTestActorRuntime::TEventObserverHolder Holder; + THashSet UnblockedOnce; + bool Stopped = false; + }; + + +} // namespace NActors diff --git a/ydb/core/testlib/actors/test_runtime_ut.cpp b/ydb/core/testlib/actors/test_runtime_ut.cpp index 931176a45642..e36b842df65a 100644 --- a/ydb/core/testlib/actors/test_runtime_ut.cpp +++ b/ydb/core/testlib/actors/test_runtime_ut.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -717,6 +718,114 @@ Y_UNIT_TEST_SUITE(TActorTest) { runtime.WaitFor("value = 42", [&]{ return value == 42; }); UNIT_ASSERT_VALUES_EQUAL(value, 42); } -}; + + Y_UNIT_TEST(TestBlockEvents) { + enum EEv { + EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE) + }; + + struct TEvTrigger : public TEventLocal { + int Value; + + TEvTrigger(int value) + : Value(value) + {} + }; + + class TTargetActor : public TActorBootstrapped { + public: + TTargetActor(std::vector* ptr) + : Ptr(ptr) + {} + + void Bootstrap() { + Become(&TThis::StateWork); + } + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTrigger, Handle); + } + } + + void Handle(TEvTrigger::TPtr& ev) { + Ptr->push_back(ev->Get()->Value); + } + + private: + std::vector* Ptr; + }; + + class TSourceActor : public TActorBootstrapped { + public: + TSourceActor(const TActorId& target) + : Target(target) + {} + + void Bootstrap() { + Become(&TThis::StateWork); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvWakeup, Handle); + } + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + Send(Target, new TEvTrigger(++Counter)); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } + + private: + TActorId Target; + int Counter = 0; + }; + + TTestActorRuntime runtime(2); + runtime.Initialize(MakeEgg()); + + std::vector values; + auto target = runtime.Register(new TTargetActor(&values), /* nodeIdx */ 1); + auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1); + runtime.EnableScheduleForActor(source); + + TBlockEvents block(runtime, [&](TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; }); + runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; }); + UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u); + + block.Unblock(2); + UNIT_ASSERT_VALUES_EQUAL(block.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u); + + runtime.WaitFor("blocked 1 more event", [&]{ return block.size() >= 2; }); + UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(values.at(0), 1); + UNIT_ASSERT_VALUES_EQUAL(values.at(1), 2); + values.clear(); + + block.Stop(); + runtime.WaitFor("processed 2 more events", [&]{ return values.size() >= 2; }); + UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u); + UNIT_ASSERT_VALUES_EQUAL(values.at(0), 5); + UNIT_ASSERT_VALUES_EQUAL(values.at(1), 6); + values.clear(); + + block.Unblock(); + UNIT_ASSERT_VALUES_EQUAL(block.size(), 0u); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u); + runtime.WaitFor("processed 3 more events", [&]{ return values.size() >= 3; }); + UNIT_ASSERT_VALUES_EQUAL(values.size(), 3u); + UNIT_ASSERT_VALUES_EQUAL(values.at(0), 3); + UNIT_ASSERT_VALUES_EQUAL(values.at(1), 4); + UNIT_ASSERT_VALUES_EQUAL(values.at(2), 7); + } +} } diff --git a/ydb/core/testlib/actors/ya.make b/ydb/core/testlib/actors/ya.make index 25f814605794..9c7caacf445d 100644 --- a/ydb/core/testlib/actors/ya.make +++ b/ydb/core/testlib/actors/ya.make @@ -1,7 +1,10 @@ LIBRARY() SRCS( + block_events.cpp + block_events.h test_runtime.cpp + test_runtime.h ) PEERDIR( diff --git a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp index 2710e08e9b17..14153b3e8912 100644 --- a/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard_ut_read_iterator.cpp @@ -4,6 +4,7 @@ #include "read_iterator.h" #include +#include #include #include #include @@ -4670,58 +4671,6 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) { "result2: " << result2); } - template - class TBlockEvents : public std::deque { - public: - TBlockEvents(TTestActorRuntime& runtime, std::function condition = {}) - : Runtime(runtime) - , Condition(std::move(condition)) - , Holder(Runtime.AddObserver( - [this](typename TEvType::TPtr& ev) { - this->Process(ev); - })) - {} - - TBlockEvents& Unblock(size_t count = -1) { - while (!this->empty() && count > 0) { - auto& ev = this->front(); - IEventHandle* ptr = ev.Get(); - UnblockedOnce.insert(ptr); - Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true); - this->pop_front(); - --count; - } - return *this; - } - - void Stop() { - UnblockedOnce.clear(); - Holder.Remove(); - } - - private: - void Process(typename TEvType::TPtr& ev) { - IEventHandle* ptr = ev.Get(); - auto it = UnblockedOnce.find(ptr); - if (it != UnblockedOnce.end()) { - UnblockedOnce.erase(it); - return; - } - - if (Condition && !Condition(ev)) { - return; - } - - this->emplace_back(std::move(ev)); - } - - private: - TTestActorRuntime& Runtime; - std::function Condition; - TTestActorRuntime::TEventObserverHolder Holder; - THashSet UnblockedOnce; - }; - Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); From a4ad95c9c9c3340599bf447078a25cce0d86f4a7 Mon Sep 17 00:00:00 2001 From: kungurtsev Date: Thu, 15 Aug 2024 15:45:22 +0200 Subject: [PATCH 3/7] Use new test helper TBlockEvents (#7783) --- ydb/core/testlib/actors/block_events.h | 4 ++- .../tx/datashard/datashard_ut_build_index.cpp | 8 +++--- ydb/core/tx/datashard/datashard_ut_stats.cpp | 25 ++++++------------- 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/ydb/core/testlib/actors/block_events.h b/ydb/core/testlib/actors/block_events.h index 2b76acf54eca..a267d4f3cc33 100644 --- a/ydb/core/testlib/actors/block_events.h +++ b/ydb/core/testlib/actors/block_events.h @@ -27,7 +27,7 @@ namespace NActors { * Unblocks up to count events at the front of the deque, allowing them * to be handled by the destination actor. */ - TBlockEvents& Unblock(size_t count = -1) { + TBlockEvents& Unblock(size_t count = Max()) { while (!this->empty() && count > 0) { auto& ev = this->front(); if (!Stopped) { @@ -36,6 +36,7 @@ namespace NActors { } ui32 nodeId = ev->GetRecipientRewrite().NodeId(); ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId(); + Cerr << "TBlockEvents::Unblock " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true); this->pop_front(); --count; @@ -67,6 +68,7 @@ namespace NActors { return; } + Cerr << "TBlockEvents::Block " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; this->emplace_back(std::move(ev)); } diff --git a/ydb/core/tx/datashard/datashard_ut_build_index.cpp b/ydb/core/tx/datashard/datashard_ut_build_index.cpp index 35cb7139e6d6..243721d8a2d2 100644 --- a/ydb/core/tx/datashard/datashard_ut_build_index.cpp +++ b/ydb/core/tx/datashard/datashard_ut_build_index.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -167,11 +168,8 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false); - auto observer = runtime.AddObserver([&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { - Cerr << "Captured TEvDataShard::TEvCompactBorrowed from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; - if (runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR") { - event.Reset(); - } + TBlockEvents block(runtime, [&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { + return runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR"; }); auto snapshot = CreateVolatileSnapshot(server, { "/Root/table-1" }); diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp index 18f19e6b6bcc..a056ef86313c 100644 --- a/ydb/core/tx/datashard/datashard_ut_stats.cpp +++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp @@ -1,6 +1,7 @@ #include #include "ydb/core/tablet_flat/shared_sausagecache.h" #include +#include namespace NKikimr { @@ -441,28 +442,16 @@ Y_UNIT_TEST_SUITE(DataShardStats) { const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0); UpsertRows(server, sender); - - bool captured = false; - auto observer = runtime.AddObserver([&](NSharedCache::TEvResult::TPtr& event) { - Cerr << "Captured NSharedCache::TEvResult from " << runtime.FindActorName(event->Sender) << " to " << runtime.FindActorName(event->GetRecipientRewrite()) << Endl; - if (runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER") { - auto& message = *event->Get(); - event.Reset(static_cast *>( - new IEventHandle(event->Recipient, event->Sender, - new NSharedCache::TEvResult(message.Origin, message.Cookie, NKikimrProto::NODATA)))); - captured = true; - } + + TBlockEvents block(runtime, [&](NSharedCache::TEvResult::TPtr& event) { + return runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER"; }); CompactTable(runtime, shard1, tableId1, false); - for (int i = 0; i < 5 && !captured; ++i) { - TDispatchOptions options; - options.CustomFinalCondition = [&]() { return captured; }; - runtime.DispatchEvents(options, TDuration::Seconds(5)); - } - UNIT_ASSERT(captured); - observer.Remove(); + runtime.WaitFor("blocked read", [&]{ return block.size(); }); + + block.Stop().Unblock(); { Cerr << "Waiting stats.." << Endl; From 39bd6ac9b5ed7b19b7d771d5a67aa4da0c870a1a Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Thu, 22 Aug 2024 12:00:46 +0300 Subject: [PATCH 4/7] Show a nicer demangled name in block/unblock messages (#8138) --- ydb/core/testlib/actors/block_events.h | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ydb/core/testlib/actors/block_events.h b/ydb/core/testlib/actors/block_events.h index a267d4f3cc33..5cbf01ca000e 100644 --- a/ydb/core/testlib/actors/block_events.h +++ b/ydb/core/testlib/actors/block_events.h @@ -36,7 +36,10 @@ namespace NActors { } ui32 nodeId = ev->GetRecipientRewrite().NodeId(); ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId(); - Cerr << "TBlockEvents::Unblock " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; + Cerr << "... unblocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName()) + << " from " << Runtime.FindActorName(ev->Sender) + << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) + << Endl; Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true); this->pop_front(); --count; @@ -68,7 +71,10 @@ namespace NActors { return; } - Cerr << "TBlockEvents::Block " << typeid(TEvType).name() << " from " << Runtime.FindActorName(ev->Sender) << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) << Endl; + Cerr << "... blocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName()) + << " from " << Runtime.FindActorName(ev->Sender) + << " to " << Runtime.FindActorName(ev->GetRecipientRewrite()) + << Endl; this->emplace_back(std::move(ev)); } From 347b0c9552b27bfab73ec7d2a81182c99fdee6f4 Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Tue, 3 Sep 2024 11:17:58 +0300 Subject: [PATCH 5/7] Add const to the condition function (#8644) --- ydb/core/testlib/actors/block_events.h | 2 +- ydb/core/testlib/actors/test_runtime_ut.cpp | 2 +- ydb/core/tx/datashard/datashard_ut_build_index.cpp | 2 +- ydb/core/tx/datashard/datashard_ut_stats.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/testlib/actors/block_events.h b/ydb/core/testlib/actors/block_events.h index 5cbf01ca000e..f54d525cf6cb 100644 --- a/ydb/core/testlib/actors/block_events.h +++ b/ydb/core/testlib/actors/block_events.h @@ -14,7 +14,7 @@ namespace NActors { template class TBlockEvents : public std::deque { public: - TBlockEvents(TTestActorRuntime& runtime, std::function condition = {}) + TBlockEvents(TTestActorRuntime& runtime, std::function condition = {}) : Runtime(runtime) , Condition(std::move(condition)) , Holder(Runtime.AddObserver( diff --git a/ydb/core/testlib/actors/test_runtime_ut.cpp b/ydb/core/testlib/actors/test_runtime_ut.cpp index e36b842df65a..54cde3af42fd 100644 --- a/ydb/core/testlib/actors/test_runtime_ut.cpp +++ b/ydb/core/testlib/actors/test_runtime_ut.cpp @@ -793,7 +793,7 @@ Y_UNIT_TEST_SUITE(TActorTest) { auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1); runtime.EnableScheduleForActor(source); - TBlockEvents block(runtime, [&](TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; }); + TBlockEvents block(runtime, [&](const TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; }); runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; }); UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u); UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u); diff --git a/ydb/core/tx/datashard/datashard_ut_build_index.cpp b/ydb/core/tx/datashard/datashard_ut_build_index.cpp index 243721d8a2d2..a15064bb875d 100644 --- a/ydb/core/tx/datashard/datashard_ut_build_index.cpp +++ b/ydb/core/tx/datashard/datashard_ut_build_index.cpp @@ -168,7 +168,7 @@ Y_UNIT_TEST_SUITE(TTxDataShardBuildIndexScan) { CreateShardedTableForIndex(server, sender, "/Root", "table-2", 1, false); - TBlockEvents block(runtime, [&](TEvDataShard::TEvCompactBorrowed::TPtr& event) { + TBlockEvents block(runtime, [&](const TEvDataShard::TEvCompactBorrowed::TPtr& event) { return runtime.FindActorName(event->Sender) == "FLAT_SCHEMESHARD_ACTOR"; }); diff --git a/ydb/core/tx/datashard/datashard_ut_stats.cpp b/ydb/core/tx/datashard/datashard_ut_stats.cpp index a056ef86313c..5b4c8311cba9 100644 --- a/ydb/core/tx/datashard/datashard_ut_stats.cpp +++ b/ydb/core/tx/datashard/datashard_ut_stats.cpp @@ -443,7 +443,7 @@ Y_UNIT_TEST_SUITE(DataShardStats) { UpsertRows(server, sender); - TBlockEvents block(runtime, [&](NSharedCache::TEvResult::TPtr& event) { + TBlockEvents block(runtime, [&](const NSharedCache::TEvResult::TPtr& event) { return runtime.FindActorName(event->GetRecipientRewrite()) == "DATASHARD_STATS_BUILDER"; }); From 591361ff3a7bd1460d6b6b13e11a4a792ef31085 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Wed, 4 Sep 2024 18:35:09 +0300 Subject: [PATCH 6/7] Make it easier to use IEventHandle as an unfiltered event type in tests (#8723) --- ydb/library/actors/core/event.h | 9 +++- ydb/library/actors/testlib/test_runtime.h | 58 ++++++++++++++++------- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/ydb/library/actors/core/event.h b/ydb/library/actors/core/event.h index bb951d0db9af..2478da951b19 100644 --- a/ydb/library/actors/core/event.h +++ b/ydb/library/actors/core/event.h @@ -67,6 +67,9 @@ namespace NActors { } }; + public: + typedef TAutoPtr TPtr; + public: template inline TEv* CastAsLocal() const noexcept { @@ -349,6 +352,10 @@ namespace NActors { template class TEventHandle: public IEventHandle { TEventHandle(); // we never made instance of TEventHandle + + public: + typedef TAutoPtr> TPtr; + public: TEventType* Get() { return IEventHandle::Get(); @@ -371,7 +378,7 @@ namespace NActors { // still abstract typedef TEventHandle THandle; - typedef TAutoPtr TPtr; + typedef typename THandle::TPtr TPtr; }; #define DEFINE_SIMPLE_LOCAL_EVENT(eventType, header) \ diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index 9d83426c0e60..2255feee93aa 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -203,6 +203,36 @@ namespace NActors { } }; + /** + * Allows customizing behavior based on the event type + */ + template + struct TTestEventObserverTraits { + static bool Match(IEventHandle::TPtr& ev) noexcept { + return ev->GetTypeRewrite() == TEvType::EventType; + } + + static typename TEvType::TPtr& Convert(IEventHandle::TPtr& ev) noexcept { + return reinterpret_cast(ev); + } + }; + + template<> + struct TTestEventObserverTraits { + static constexpr bool Match(IEventHandle::TPtr&) noexcept { + return true; + } + + static constexpr IEventHandle::TPtr& Convert(IEventHandle::TPtr& ev) noexcept { + return ev; + } + }; + + template + struct TTestEventObserverTraits> + : public TTestEventObserverTraits + {}; + class TTestActorRuntimeBase: public TNonCopyable { public: class TEdgeActor; @@ -375,24 +405,19 @@ namespace NActors { observerHolder.Remove(); */ - template + template TEventObserverHolder AddObserver(std::function observerFunc) { - auto baseFunc = [observerFunc](TAutoPtr& event) { - if (event && event->GetTypeRewrite() == TEvType::EventType) - observerFunc(*(reinterpret_cast(&event))); + auto baseFunc = [observerFunc](IEventHandle::TPtr& event) { + if (event && TTestEventObserverTraits::Match(event)) { + observerFunc(TTestEventObserverTraits::Convert(event)); + } }; auto iter = ObserverFuncs.insert(ObserverFuncs.end(), baseFunc); return TEventObserverHolder(&ObserverFuncs, std::move(iter)); } - TEventObserverHolder AddObserver(std::function&)> observerFunc) - { - auto iter = ObserverFuncs.insert(ObserverFuncs.end(), observerFunc); - return TEventObserverHolder(&ObserverFuncs, std::move(iter)); - } - template void AppendToLogSettings(NLog::EComponent minVal, NLog::EComponent maxVal, T func) { Y_ABORT_UNLESS(!IsInitialized); @@ -445,15 +470,14 @@ namespace NActors { TDuration simTimeout = TDuration::Max()) { typename TEvent::TPtr handle; - const ui32 eventType = TEvent::EventType; WaitForEdgeEvents([&](TTestActorRuntimeBase& runtime, TAutoPtr& event) { Y_UNUSED(runtime); - if (event->GetTypeRewrite() != eventType) + if (!TTestEventObserverTraits::Match(event)) return false; - typename TEvent::TPtr* typedEvent = reinterpret_cast(&event); - if (predicate(*typedEvent)) { - handle = *typedEvent; + typename TEvent::TPtr& typedEvent = TTestEventObserverTraits::Convert(event); + if (predicate(typedEvent)) { + handle = std::move(typedEvent); return true; } @@ -808,8 +832,8 @@ namespace NActors { const std::function& predicate) { ev.Destroy(); for (auto& event : events) { - if (event && event->GetTypeRewrite() == TEvent::EventType) { - if (predicate(reinterpret_cast(event))) { + if (event && TTestEventObserverTraits::Match(event)) { + if (predicate(TTestEventObserverTraits::Convert(event))) { ev = event; return ev->CastAsLocal(); } From 73be542c76550f9aef36d093a750633547bec470 Mon Sep 17 00:00:00 2001 From: Aleksei Borzenkov Date: Fri, 6 Sep 2024 16:10:23 +0300 Subject: [PATCH 7/7] 24-3: Fix resolved timestamp emitted too early for some displaced upserts --- .../tx/datashard/cdc_stream_heartbeat.cpp | 32 ++-- .../datashard_ut_change_exchange.cpp | 140 +++++++++++++++++- 2 files changed, 154 insertions(+), 18 deletions(-) diff --git a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp index 00396a250977..0c228730c13a 100644 --- a/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp +++ b/ydb/core/tx/datashard/cdc_stream_heartbeat.cpp @@ -95,27 +95,27 @@ void TDataShard::EmitHeartbeats() { return; } + // We may possibly have more writes at this version + TRowVersion edge = GetMvccTxVersion(EMvccTxMode::ReadWrite); + bool wait = true; + if (const auto& plan = TransQueue.GetPlan()) { - const auto version = Min(plan.begin()->ToRowVersion(), VolatileTxManager.GetMinUncertainVersion()); - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); - } - return; + edge = Min(edge, plan.begin()->ToRowVersion()); + wait = false; } if (auto version = VolatileTxManager.GetMinUncertainVersion(); !version.IsMax()) { - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(version)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, version)); - } - return; + edge = Min(edge, version); + wait = false; } - const TRowVersion nextWrite = GetMvccTxVersion(EMvccTxMode::ReadWrite); - if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(nextWrite)) { - return Execute(new TTxCdcStreamEmitHeartbeats(this, nextWrite)); + if (CdcStreamHeartbeatManager.ShouldEmitHeartbeat(edge)) { + return Execute(new TTxCdcStreamEmitHeartbeats(this, edge)); } - WaitPlanStep(lowest.Next().Step); + if (wait) { + WaitPlanStep(lowest.Next().Step); + } } void TCdcStreamHeartbeatManager::Reset() { @@ -215,7 +215,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co return false; } - if (Schedule.top().Version > edge) { + if (Schedule.top().Version >= edge) { return false; } @@ -225,7 +225,7 @@ bool TCdcStreamHeartbeatManager::ShouldEmitHeartbeat(const TRowVersion& edge) co THashMap TCdcStreamHeartbeatManager::EmitHeartbeats( NTable::TDatabase& db, const TRowVersion& edge) { - if (Schedule.empty() || Schedule.top().Version > edge) { + if (!ShouldEmitHeartbeat(edge)) { return {}; } @@ -234,7 +234,7 @@ THashMap TCdcStreamHeartbea while (true) { const auto& top = Schedule.top(); - if (top.Version > edge) { + if (top.Version >= edge) { break; } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 7664fff85a8d..4991860291da 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) { return result; } - void WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector& expected) { + TVector WaitForContent(TServer::TPtr server, const TActorId& sender, const TString& path, const TVector& expected) { while (true) { const auto records = GetRecords(*server->GetRuntime(), sender, path, 0); for (ui32 i = 0; i < std::min(records.size(), expected.size()); ++i) { @@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) { if (records.size() >= expected.size()) { UNIT_ASSERT_VALUES_EQUAL_C(records.size(), expected.size(), "Unexpected record: " << records.at(expected.size()).second); - break; + TVector values; + for (const auto& pr : records) { + bool ok = NJson::ReadJsonTree(pr.second, &values.emplace_back()); + Y_ABORT_UNLESS(ok); + } + return values; } SimulateSleep(server, TDuration::Seconds(1)); @@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST(ResolvedTimestampForDisplacedUpsert) { + TPortManager portManager; + TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig()) + .SetUseRealThreads(false) + .SetDomainName("Root") + ); + + TDisableDataShardLogBatching disableDataShardLogBatching; + + auto& runtime = *server->GetRuntime(); + const auto edgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(server, edgeActor); + SetSplitMergePartCountLimit(&runtime, -1); + CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable()); + + WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table", + WithVirtualTimestamps(WithResolvedTimestamps( + TDuration::Seconds(3), Updates(NKikimrSchemeOp::ECdcStreamFormatJson))))); + + Cerr << "... prepare" << Endl; + WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + }); + + KqpSimpleExec(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10); + )"); + + auto records = WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1],"ts":"***"})", + R"({"resolved":"***"})", + }); + + // Take the final step + ui64 lastStep = records.back()["resolved"][0].GetUInteger(); + Cerr << "... last heartbeat at " << lastStep << Endl; + + const auto tableId = ResolveTableId(server, edgeActor, "/Root/Table"); + const auto shards = GetTableShards(server, edgeActor, "/Root/Table"); + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 1u); + + ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + ui64 snapshotStep = lastStep + 3000 - 1; + ForwardToTablet(runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps(coordinator, snapshotStep)); + + TBlockEvents blockedUpdates(runtime, + [&](auto& ev) { + return ev->Get()->Record.GetTimeBarrier() > snapshotStep; + }); + + Cerr << "... performing a read from snapshot just before the next heartbeat" << Endl; + { + auto req = std::make_unique(); + { + auto& record = req->Record; + record.SetReadId(1); + record.MutableTableId()->SetOwnerId(tableId.PathId.OwnerId); + record.MutableTableId()->SetTableId(tableId.PathId.LocalPathId); + record.AddColumns(1); + record.AddColumns(2); + record.SetResultFormat(NKikimrDataEvents::FORMAT_CELLVEC); + ui32 key = 1; + TVector keys; + keys.push_back(TCell::Make(key)); + req->Keys.push_back(TSerializedCellVec(TSerializedCellVec::Serialize(keys))); + record.MutableSnapshot()->SetStep(snapshotStep); + record.MutableSnapshot()->SetTxId(Max()); + } + ForwardToTablet(runtime, shards.at(0), edgeActor, req.release()); + auto ev = runtime.GrabEdgeEventRethrow(edgeActor); + auto* res = ev->Get(); + UNIT_ASSERT_VALUES_EQUAL(res->Record.GetStatus().GetCode(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(res->Record.GetFinished(), true); + Cerr << "... read finished" << Endl; + } + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... starting upsert 1 (expected to displace)" << Endl; + auto upsert1 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20); + )"); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... starting upsert 2 (expected to displace)" << Endl; + auto upsert2 = KqpSimpleSend(runtime, R"( + UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30); + )"); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... unblocking updates" << Endl; + blockedUpdates.Unblock().Stop(); + for (int i = 0; i < 10; ++i) { + runtime.SimulateSleep(TDuration::MilliSeconds(1)); + } + + Cerr << "... checking the update is logged before the new resolved timestamp" << Endl; + records = WaitForContent(server, edgeActor, "/Root/Table/Stream", { + R"({"resolved":"***"})", + R"({"update":{"value":10},"key":[1],"ts":"***"})", + R"({"resolved":"***"})", + R"({"update":{"value":20},"key":[2],"ts":"***"})", + R"({"update":{"value":30},"key":[3],"ts":"***"})", + R"({"resolved":"***"})", + }); + + TRowVersion resolved(0, 0); + for (auto& record : records) { + if (record.Has("resolved")) { + resolved.Step = record["resolved"][0].GetUInteger(); + resolved.TxId = record["resolved"][1].GetUInteger(); + } + if (record.Has("ts")) { + TRowVersion ts( + record["ts"][0].GetUInteger(), + record["ts"][1].GetUInteger()); + UNIT_ASSERT_C(resolved < ts, + "Record with ts " << ts << " after resolved " << resolved); + } + } + } + } // Cdc } // NKikimr