Skip to content

Commit

Permalink
Merge 73be542 into 04968d0
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Sep 6, 2024
2 parents 04968d0 + 73be542 commit 720647d
Show file tree
Hide file tree
Showing 12 changed files with 539 additions and 118 deletions.
1 change: 1 addition & 0 deletions ydb/core/testlib/actors/block_events.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "block_events.h"
90 changes: 90 additions & 0 deletions ydb/core/testlib/actors/block_events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#include "test_runtime.h"

#include <deque>
#include <functional>

namespace NActors {

/**
* Easy blocking for events under the test actor runtime
*
* Matching events are blocked just before they are processed and stashed
* into a deque.
*/
template<class TEvType>
class TBlockEvents : public std::deque<typename TEvType::TPtr> {
public:
TBlockEvents(TTestActorRuntime& runtime, std::function<bool(const typename TEvType::TPtr&)> condition = {})
: Runtime(runtime)
, Condition(std::move(condition))
, Holder(Runtime.AddObserver<TEvType>(
[this](typename TEvType::TPtr& ev) {
this->Process(ev);
}))
{}

/**
* Unblocks up to count events at the front of the deque, allowing them
* to be handled by the destination actor.
*/
TBlockEvents& Unblock(size_t count = Max<size_t>()) {
while (!this->empty() && count > 0) {
auto& ev = this->front();
if (!Stopped) {
IEventHandle* ptr = ev.Get();
UnblockedOnce.insert(ptr);
}
ui32 nodeId = ev->GetRecipientRewrite().NodeId();
ui32 nodeIdx = nodeId - Runtime.GetFirstNodeId();
Cerr << "... unblocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName<TEvType>())
<< " from " << Runtime.FindActorName(ev->Sender)
<< " to " << Runtime.FindActorName(ev->GetRecipientRewrite())
<< Endl;
Runtime.Send(ev.Release(), nodeIdx, /* viaActorSystem */ true);
this->pop_front();
--count;
}
return *this;
}

/**
* Stops blocking any new events. Events currently in the deque are
* not unblocked, but may be unblocked at a later time if needed.
*/
TBlockEvents& Stop() {
UnblockedOnce.clear();
Holder.Remove();
Stopped = true;
return *this;
}

private:
void Process(typename TEvType::TPtr& ev) {
IEventHandle* ptr = ev.Get();
auto it = UnblockedOnce.find(ptr);
if (it != UnblockedOnce.end()) {
UnblockedOnce.erase(it);
return;
}

if (Condition && !Condition(ev)) {
return;
}

Cerr << "... blocking " << (ev->HasEvent() ? TypeName(*ev->GetBase()) : TypeName<TEvType>())
<< " from " << Runtime.FindActorName(ev->Sender)
<< " to " << Runtime.FindActorName(ev->GetRecipientRewrite())
<< Endl;
this->emplace_back(std::move(ev));
}

private:
TTestActorRuntime& Runtime;
std::function<bool(typename TEvType::TPtr&)> Condition;
TTestActorRuntime::TEventObserverHolder Holder;
THashSet<IEventHandle*> UnblockedOnce;
bool Stopped = false;
};


} // namespace NActors
32 changes: 26 additions & 6 deletions ydb/core/testlib/actors/test_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,42 @@ namespace NActors {
void SimulateSleep(TDuration duration);

template<class TResult>
inline TResult WaitFuture(NThreading::TFuture<TResult> f) {
inline TResult WaitFuture(NThreading::TFuture<TResult> f, TDuration simTimeout = TDuration::Max()) {
if (!f.HasValue() && !f.HasException()) {
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return f.HasValue() || f.HasException();
};
options.FinalEvents.emplace_back([&](IEventHandle&) {
return f.HasValue() || f.HasException();
});
// Quirk: non-empty FinalEvents enables full simulation
options.FinalEvents.emplace_back([](IEventHandle&) { return false; });

this->DispatchEvents(options);
this->DispatchEvents(options, simTimeout);

Y_ABORT_UNLESS(f.HasValue() || f.HasException());
}

return f.ExtractValue();
if constexpr (!std::is_same_v<TResult, void>) {
return f.ExtractValue();
} else {
return f.GetValue();
}
}

template<class TCondition>
inline void WaitFor(const TString& description, const TCondition& condition, TDuration simTimeout = TDuration::Max()) {
if (!condition()) {
TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return condition();
};
// Quirk: non-empty FinalEvents enables full simulation
options.FinalEvents.emplace_back([](IEventHandle&) { return false; });

Cerr << "... waiting for " << description << Endl;
this->DispatchEvents(options, simTimeout);

Y_ABORT_UNLESS(condition(), "Timeout while waiting for %s", description.c_str());
}
}

TIntrusivePtr<NKikimr::TMemObserver> GetMemObserver(ui32 nodeIndex = 0) {
Expand Down
206 changes: 205 additions & 1 deletion ydb/core/testlib/actors/test_runtime_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <ydb/core/testlib/actors/test_runtime.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
Expand Down Expand Up @@ -622,6 +623,209 @@ Y_UNIT_TEST_SUITE(TActorTest) {
UNIT_ASSERT_VALUES_EQUAL(event->Get()->Index, 12u);
}
}
};

Y_UNIT_TEST(TestWaitFuture) {
enum EEv {
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
};

struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
TEvTrigger() = default;
};

class TTriggerActor : public TActorBootstrapped<TTriggerActor> {
public:
TTriggerActor(NThreading::TPromise<void> promise)
: Promise(std::move(promise))
{}

void Bootstrap() {
Schedule(TDuration::Seconds(1), new TEvTrigger);
Become(&TThis::StateWork);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTrigger, Handle);
}
}

void Handle(TEvTrigger::TPtr&) {
Promise.SetValue();
PassAway();
}

private:
NThreading::TPromise<void> Promise;
};

TTestActorRuntime runtime;
runtime.Initialize(MakeEgg());

NThreading::TPromise<void> promise = NThreading::NewPromise<void>();
NThreading::TFuture<void> future = promise.GetFuture();

auto actor = runtime.Register(new TTriggerActor(std::move(promise)));
runtime.EnableScheduleForActor(actor);

runtime.WaitFuture(std::move(future));
}

Y_UNIT_TEST(TestWaitFor) {
enum EEv {
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
};

struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
TEvTrigger() = default;
};

class TTriggerActor : public TActorBootstrapped<TTriggerActor> {
public:
TTriggerActor(int* ptr)
: Ptr(ptr)
{}

void Bootstrap() {
Schedule(TDuration::Seconds(1), new TEvTrigger);
Become(&TThis::StateWork);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTrigger, Handle);
}
}

void Handle(TEvTrigger::TPtr&) {
*Ptr = 42;
PassAway();
}

private:
int* Ptr;
};

TTestActorRuntime runtime;
runtime.Initialize(MakeEgg());

int value = 0;
auto actor = runtime.Register(new TTriggerActor(&value));
runtime.EnableScheduleForActor(actor);

runtime.WaitFor("value = 42", [&]{ return value == 42; });
UNIT_ASSERT_VALUES_EQUAL(value, 42);
}

Y_UNIT_TEST(TestBlockEvents) {
enum EEv {
EvTrigger = EventSpaceBegin(TEvents::ES_PRIVATE)
};

struct TEvTrigger : public TEventLocal<TEvTrigger, EvTrigger> {
int Value;

TEvTrigger(int value)
: Value(value)
{}
};

class TTargetActor : public TActorBootstrapped<TTargetActor> {
public:
TTargetActor(std::vector<int>* ptr)
: Ptr(ptr)
{}

void Bootstrap() {
Become(&TThis::StateWork);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvTrigger, Handle);
}
}

void Handle(TEvTrigger::TPtr& ev) {
Ptr->push_back(ev->Get()->Value);
}

private:
std::vector<int>* Ptr;
};

class TSourceActor : public TActorBootstrapped<TSourceActor> {
public:
TSourceActor(const TActorId& target)
: Target(target)
{}

void Bootstrap() {
Become(&TThis::StateWork);
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
STFUNC(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvWakeup, Handle);
}
}

void Handle(TEvents::TEvWakeup::TPtr&) {
Send(Target, new TEvTrigger(++Counter));
Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}

private:
TActorId Target;
int Counter = 0;
};

TTestActorRuntime runtime(2);
runtime.Initialize(MakeEgg());

std::vector<int> values;
auto target = runtime.Register(new TTargetActor(&values), /* nodeIdx */ 1);
auto source = runtime.Register(new TSourceActor(target), /* nodeIdx */ 1);
runtime.EnableScheduleForActor(source);

TBlockEvents<TEvTrigger> block(runtime, [&](const TEvTrigger::TPtr& ev){ return ev->GetRecipientRewrite() == target; });
runtime.WaitFor("blocked 3 events", [&]{ return block.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

block.Unblock(2);
UNIT_ASSERT_VALUES_EQUAL(block.size(), 1u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);

runtime.WaitFor("blocked 1 more event", [&]{ return block.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 1);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 2);
values.clear();

block.Stop();
runtime.WaitFor("processed 2 more events", [&]{ return values.size() >= 2; });
UNIT_ASSERT_VALUES_EQUAL(block.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 2u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 5);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 6);
values.clear();

block.Unblock();
UNIT_ASSERT_VALUES_EQUAL(block.size(), 0u);
UNIT_ASSERT_VALUES_EQUAL(values.size(), 0u);
runtime.WaitFor("processed 3 more events", [&]{ return values.size() >= 3; });
UNIT_ASSERT_VALUES_EQUAL(values.size(), 3u);
UNIT_ASSERT_VALUES_EQUAL(values.at(0), 3);
UNIT_ASSERT_VALUES_EQUAL(values.at(1), 4);
UNIT_ASSERT_VALUES_EQUAL(values.at(2), 7);
}
}

}
3 changes: 3 additions & 0 deletions ydb/core/testlib/actors/ya.make
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
LIBRARY()

SRCS(
block_events.cpp
block_events.h
test_runtime.cpp
test_runtime.h
)

PEERDIR(
Expand Down
Loading

0 comments on commit 720647d

Please sign in to comment.