Skip to content

Commit

Permalink
Add a common TBlockEvents helper class for event interception (#7767)
Browse files Browse the repository at this point in the history
Co-authored-by: kungurtsev <alexey66rus@gmail.com>
  • Loading branch information
snaury and kunga authored Aug 14, 2024
1 parent d356543 commit b6537e1
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 53 deletions.
1 change: 1 addition & 0 deletions ydb/core/testlib/actors/block_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "block_events.h"
82 changes: 82 additions & 0 deletions ydb/core/testlib/actors/block_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#include "test_runtime.h"

#include <deque>
#include <functional>

namespace NActors {

/**
* Easy blocking for events under the test actor runtime
*
* Matching events are blocked just before they are processed and stashed
* into a deque.
*/
template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

/**
* Unblocks up to count events at the front of the deque, allowing them
* to be handled by the destination actor.
*/
TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
if (!Stopped) {
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
}
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId();
Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

/**
* Stops blocking any new events. Events currently in the deque are
* not unblocked, but may be unblocked at a later time if needed.
*/
TBlockEvents& Stop() {
UnblockedOnce.clear();
Holder.Remove();
Stopped = true;
return *this;
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
bool Stopped = false;
};


} // namespace NActors
111 changes: 110 additions & 1 deletion ydb/core/testlib/actors/test_runtime_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
Expand Down Expand Up @@ -717,6 +718,114 @@ Y_UNIT_TEST_SUITE(TActorTest) {
runtime.WaitFor("value = 42", [&]{ return value == 42; });
UNIT_ASSERT_VALUES_EQUAL(value, 42);
}
};

Y_UNIT_TEST(TestBlockEvents) {
enum EEv {
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
};

struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
int Value;

TEvTrigger(int value)
: Value(value)
{}
};

class TTargetActor : public TActorBootstrapped<TTargetActor> {
public:
TTargetActor(std::vector<int>* ptr)
: Ptr(ptr)
{}

void Bootstrap() {
Become(&TThis::StateWork);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTrigger, Handle);
}
}

void Handle(TEvTrigger::TPtr& ev) {
Ptr->push_back(ev->Get()->Value);
}

private:
std::vector<int>* Ptr;
};

class TSourceActor : public TActorBootstrapped<TSourceActor> {
public:
TSourceActor(const TActorId& target)
: Target(target)
{}

void Bootstrap() {
Become(&TThis::StateWork);
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvWakeup, Handle);
}
}

void Handle(TEvents::TEvWakeup::TPtr&) {
Send(Target, new TEvTrigger(++Counter));
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
TActorId Target;
int Counter = 0;
};

TTestActorRuntime runtime(2);
runtime.Initialize(MakeEgg());

std::vector<int> values;
auto target = runtime.Register(new TTargetActor(&values), /* nodeIdx */ 1);
auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1);
runtime.EnableScheduleForActor(source);

TBlockEvents<TEvTrigger> block(runtime, [&](TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; });
runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

block.Unblock(2);
UNIT_ASSERT_VALUES_EQUAL(block.size(), 1u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

runtime.WaitFor("blocked 1 more event", [&]{ return block.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 1);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 2);
values.clear();

block.Stop();
runtime.WaitFor("processed 2 more events", [&]{ return values.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 5);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 6);
values.clear();

block.Unblock();
UNIT_ASSERT_VALUES_EQUAL(block.size(), 0u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
runtime.WaitFor("processed 3 more events", [&]{ return values.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(values.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 3);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 4);
UNIT_ASSERT_VALUES_EQUAL(values.at(2), 7);
}
}

}
3 changes: 3 additions & 0 deletions ydb/core/testlib/actors/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
LIBRARY()

SRCS(
block_events.cpp
block_events.h
test_runtime.cpp
test_runtime.h
)

PEERDIR(
Expand Down
53 changes: 1 addition & 52 deletions ydb/core/tx/datashard/datashard_ut_read_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "read_iterator.h"

#include <ydb/core/testlib/tablet_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
Expand Down Expand Up @@ -4627,58 +4628,6 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorConsistency) {
"result2: " << result2);
}

template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

TBlockEvents& Unblock(size_t count = -1) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
Runtime.Send(ev.Release(), 0, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

void Stop() {
UnblockedOnce.clear();
Holder.Remove();
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
};

Y_UNIT_TEST(Bug_7674_IteratorDuplicateRows) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
Expand Down

0 comments on commit b6537e1

Please sign in to comment.