Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move methods to cpp files in actor system code #4940

Merged
merged 2 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions ydb/library/actors/core/actor.cpp

Large diffs are not rendered by default.

160 changes: 7 additions & 153 deletions ydb/library/actors/core/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ namespace NActors {
template <ESendingType SendingType = ESendingType::Common>
static bool Send(std::unique_ptr<IEventHandle> &&ev);

static bool Send(const TActorId& recipient, std::unique_ptr<IEventBase> ev, ui32 flags = 0, ui64 cookie = 0);

template <ESendingType SendingType = ESendingType::Common>
static bool Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient);

Expand Down Expand Up @@ -190,6 +192,10 @@ namespace NActors {
*/
void Schedule(TDuration delta, IEventBase* ev, ISchedulerCookie* cookie = nullptr) const;

void Schedule(TInstant deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TMonotonic deadline, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
void Schedule(TDuration delta, std::unique_ptr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;

TActorContext MakeFor(const TActorId& otherId) const {
return TActorContext(Mailbox, ExecutorThread, EventStart, otherId);
}
Expand Down Expand Up @@ -534,11 +540,7 @@ namespace NActors {
}

protected:
void SetEnoughCpu(bool isEnough) {
if (TlsThreadContext) {
TlsThreadContext->IsEnoughCpu = isEnough;
}
}
void SetEnoughCpu(bool isEnough);

void Describe(IOutputStream&) const noexcept override;
bool Send(TAutoPtr<IEventHandle> ev) const noexcept;
Expand Down Expand Up @@ -834,154 +836,6 @@ namespace NActors {
}
};


template <ESendingType SendingType>
bool TGenericExecutorThread::Send(TAutoPtr<IEventHandle> ev) {
#ifdef USE_ACTOR_CALLSTACK
do {
(ev)->Callstack = TCallstack::GetTlsCallstack();
(ev)->Callstack.Trace();
} while (false)
#endif
Ctx.IncrementSentEvents();
return ActorSystem->Send<SendingType>(ev);
}

template <ESendingType SendingType>
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId,
TActorId parentId)
{
if (!parentId) {
parentId = CurrentRecipient;
}
if (poolId == Max<ui32>()) {
if constexpr (SendingType == ESendingType::Common) {
return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
} else if (!TlsThreadContext) {
return Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
} else {
ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
TActorId id = Ctx.Executor->Register(actor, mailboxType, ++RevolvingWriteCounter, parentId);
TlsThreadContext->SendingType = previousType;
return id;
}
} else {
return ActorSystem->Register<SendingType>(actor, mailboxType, poolId, ++RevolvingWriteCounter, parentId);
}
}

template <ESendingType SendingType>
TActorId TGenericExecutorThread::RegisterActor(IActor* actor, TMailboxHeader* mailbox, ui32 hint, TActorId parentId) {
if (!parentId) {
parentId = CurrentRecipient;
}
if constexpr (SendingType == ESendingType::Common) {
return Ctx.Executor->Register(actor, mailbox, hint, parentId);
} else if (!TlsActivationContext) {
return Ctx.Executor->Register(actor, mailbox, hint, parentId);
} else {
ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
TActorId id = Ctx.Executor->Register(actor, mailbox, hint, parentId);
TlsThreadContext->SendingType = previousType;
return id;
}
}


template <ESendingType SendingType>
bool TActivationContext::Send(TAutoPtr<IEventHandle> ev) {
return TlsActivationContext->ExecutorThread.Send<SendingType>(ev);
}

template <ESendingType SendingType>
bool TActivationContext::Send(std::unique_ptr<IEventHandle> &&ev) {
return TlsActivationContext->ExecutorThread.Send<SendingType>(ev.release());
}

template <ESendingType SendingType>
bool TActivationContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) {
return Send(IEventHandle::Forward(ev, recipient));
}

template <ESendingType SendingType>
bool TActivationContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) {
return Send(IEventHandle::Forward(ev, recipient));
}

template <ESendingType SendingType>
bool TActorContext::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
return Send<SendingType>(new IEventHandle(recipient, SelfID, ev, flags, cookie, nullptr, std::move(traceId)));
}

template <ESendingType SendingType>
bool TActorContext::Send(TAutoPtr<IEventHandle> ev) const {
return ExecutorThread.Send<SendingType>(ev);
}

template <ESendingType SendingType>
bool TActorContext::Forward(TAutoPtr<IEventHandle>& ev, const TActorId& recipient) const {
return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
}

template <ESendingType SendingType>
bool TActorContext::Forward(THolder<IEventHandle>& ev, const TActorId& recipient) const {
return ExecutorThread.Send<SendingType>(IEventHandle::Forward(ev, recipient));
}

template <ESendingType SendingType>
TActorId TActivationContext::Register(IActor* actor, TActorId parentId, TMailboxType::EType mailboxType, ui32 poolId) {
return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, parentId);
}

template <ESendingType SendingType>
TActorId TActorContext::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const {
return ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfID);
}

template <ESendingType SendingType>
bool TActorIdentity::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
return TActivationContext::Send<SendingType>(new IEventHandle(recipient, *this, ev, flags, cookie, nullptr, std::move(traceId)));
}

template <ESendingType SendingType>
bool IActor::Send(const TActorId& recipient, IEventBase* ev, TEventFlags flags, ui64 cookie, NWilson::TTraceId traceId) const {
return SelfActorId.Send<SendingType>(recipient, ev, flags, cookie, std::move(traceId));
}

template <ESendingType SendingType>
TActorId IActor::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) const noexcept {
Y_ABORT_UNLESS(actor);
return TlsActivationContext->ExecutorThread.RegisterActor<SendingType>(actor, mailboxType, poolId, SelfActorId);
}


template <ESendingType SendingType>
TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool,
ui64 revolvingCounter, const TActorId& parentId) {
Y_ABORT_UNLESS(actor);
Y_ABORT_UNLESS(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
(ui32)executorPool, (ui32)ExecutorPoolCount);
if constexpr (SendingType == ESendingType::Common) {
return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
} else if (!TlsThreadContext) {
return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
} else {
ESendingType previousType = std::exchange(TlsThreadContext->SendingType, SendingType);
TActorId id = CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
TlsThreadContext->SendingType = previousType;
return id;
}
}

template <ESendingType SendingType>
bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
if constexpr (SendingType == ESendingType::Common) {
return this->GenericSend< &IExecutorPool::Send>(ev);
} else {
return this->SpecificSend(ev, SendingType);
}
}

}

template <>
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/actors/core/actor_coroutine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ namespace NActors {
}
}

TActorSystem *TActorCoroImpl::GetActorSystem() const {
return GetActorContext().ExecutorThread.ActorSystem;
}

TActorCoro::~TActorCoro() {
Impl->Destroy();
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/actor_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ namespace NActors {

protected: // Actor System compatibility section
const TActorContext& GetActorContext() const { return TActivationContext::AsActorContext(); }
TActorSystem *GetActorSystem() const { return GetActorContext().ExecutorThread.ActorSystem; }
TActorSystem *GetActorSystem() const;
TInstant Now() const { return GetActorContext().Now(); }
TMonotonic Monotonic() const { return GetActorContext().Monotonic(); }

Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ namespace NActors {


for (i16 i = 0; i != MaxFullThreadCount; ++i) {
Threads[i].Thread.Reset(
Threads[i].Thread.reset(
new TExecutorThread(
i,
0, // CpuId is not used in BASIC pool
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ namespace NActors {
ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());

for (i16 i = 0; i != PoolThreads; ++i) {
Threads[i].Thread.Reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName));
Threads[i].Thread.reset(new TExecutorThread(i, 0, actorSystem, this, MailboxTable.Get(), PoolName));
}

*scheduleReaders = &ScheduleQueue->Reader;
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void TSharedExecutorPool::Prepare(TActorSystem* actorSystem, NSchedulerQueue::TR
for (i16 i = 0; i != SharedThreadCount; ++i) {
// !TODO
Threads[i].ExecutorPools[0].store(dynamic_cast<TBasicExecutorPool*>(poolByThread[i]), std::memory_order_release);
Threads[i].Thread.Reset(
Threads[i].Thread.reset(
new TSharedExecutorThread(
-1,
actorSystem,
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,4 +801,6 @@ namespace NActors {
statsCopy.Aggregate(SharedStats[poolId]);
}

TGenericExecutorThreadCtx::~TGenericExecutorThreadCtx()
{}
}
4 changes: 3 additions & 1 deletion ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace NActors {
};

struct TGenericExecutorThreadCtx {
TAutoPtr<TGenericExecutorThread> Thread;
std::unique_ptr<TGenericExecutorThread> Thread;

protected:
friend class TIOExecutorPool;
Expand All @@ -31,6 +31,8 @@ namespace NActors {
std::atomic<ui64> WaitingFlag = static_cast<ui64>(EThreadState::None);

public:
~TGenericExecutorThreadCtx(); // in executor_thread.cpp

ui64 StartWakingTs = 0;

ui64 GetStateInt() {
Expand Down
Loading