diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index e9bbca09d821..5c5e68300b7b 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -209,3 +209,14 @@ message TEvStopWorker { optional TControllerIdentity Controller = 1; optional TWorkerIdentity Worker = 2; } + +message TEvWorkerStatus { + enum EStatus { + UNKNOWN = 0; + RUNNING = 1; + STOPPED = 2; + } + + optional TWorkerIdentity Worker = 1; + optional EStatus Status = 2; +} diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 076c5234edcb..37e01c762f4c 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -57,10 +57,12 @@ STFUNC(TController::StateWork) { HFunc(TEvPrivate::TEvResolveSecretResult, Handle); HFunc(TEvPrivate::TEvResolveTenantResult, Handle); HFunc(TEvPrivate::TEvUpdateTenantNodes, Handle); - HFunc(TEvPrivate::TEvRunWorkers, Handle); + HFunc(TEvPrivate::TEvProcessQueues, Handle); + HFunc(TEvPrivate::TEvRemoveWorker, Handle); HFunc(TEvDiscovery::TEvDiscoveryData, Handle); HFunc(TEvDiscovery::TEvError, Handle); HFunc(TEvService::TEvStatus, Handle); + HFunc(TEvService::TEvWorkerStatus, Handle); HFunc(TEvService::TEvRunWorker, Handle); HFunc(TEvInterconnect::TEvNodeDisconnected, Handle); default: @@ -271,13 +273,13 @@ void TController::DeleteSession(ui32 nodeId, const TActorContext& ctx) { worker.ClearSession(); if (worker.HasCommand()) { - WorkersToRun.insert(id); + BootQueue.insert(id); } } Sessions.erase(nodeId); CloseSession(nodeId, ctx); - ScheduleRunWorkers(); + ScheduleProcessQueues(); } void TController::CloseSession(ui32 nodeId, const TActorContext& ctx) { @@ -300,42 +302,57 @@ void TController::Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& c auto& session = Sessions[nodeId]; session.SetReady(); - for (const auto& workerIdentity : ev->Get()->Record.GetWorkers()) { - const auto id = TWorkerId::Parse(workerIdentity); - - auto it = Workers.find(id); - if (it == Workers.end()) { - it = Workers.emplace(id, TWorkerInfo()).first; + for (const auto& protoId : ev->Get()->Record.GetWorkers()) { + const auto id = TWorkerId::Parse(protoId); + if (!IsValidWorker(id)) { + StopQueue.emplace(id, nodeId); + continue; } - auto& worker = it->second; - if (worker.HasSession() && Sessions.contains(worker.GetSession())) { - StopWorker(worker.GetSession(), id); + auto* worker = EnsureWorker(id); + if (worker->HasSession()) { + if (const auto sessionId = worker->GetSession(); sessionId != nodeId) { + Y_ABORT_UNLESS(Sessions.contains(sessionId)); + Sessions[sessionId].DetachWorker(id); + StopQueue.emplace(id, sessionId); + } } session.AttachWorker(id); - worker.AttachSession(nodeId); + worker->AttachSession(nodeId); } + + ScheduleProcessQueues(); } -void TController::StopWorker(ui32 nodeId, const TWorkerId& id) { - LOG_D("Stop worker" - << ": nodeId# " << nodeId - << ", workerId# " << id); +void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); - Y_ABORT_UNLESS(Sessions.contains(nodeId)); - auto& session = Sessions[nodeId]; + const auto nodeId = ev->Sender.NodeId(); + if (!Sessions.contains(nodeId)) { + return; + } - auto ev = MakeHolder(); - auto& record = ev->Record; + const auto& session = Sessions[nodeId]; + const auto& record = ev->Get()->Record; + const auto id = TWorkerId::Parse(record.GetWorker()); - auto& controller = *record.MutableController(); - controller.SetTabletId(TabletID()); - controller.SetGeneration(Executor()->Generation()); - id.Serialize(*record.MutableWorker()); + switch (record.GetStatus()) { + case NKikimrReplication::TEvWorkerStatus::RUNNING: + if (!session.HasWorker(id)) { + StopQueue.emplace(id, nodeId); + } + break; + case NKikimrReplication::TEvWorkerStatus::STOPPED: + MaybeRemoveWorker(id, ctx); + break; + default: + CLOG_W(ctx, "Unknown worker status" + << ": value# " << static_cast(record.GetStatus())); + break; + } - Send(MakeReplicationServiceId(nodeId), std::move(ev)); - session.DetachWorker(id); + ScheduleProcessQueues(); } void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) { @@ -345,56 +362,108 @@ void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext const auto id = TWorkerId::Parse(record.GetWorker()); auto* cmd = record.MutableCommand(); - auto it = Workers.find(id); - if (it == Workers.end()) { - it = Workers.emplace(id, TWorkerInfo(cmd)).first; + if (!IsValidWorker(id)) { + return; } - auto& worker = it->second; - if (!worker.HasCommand()) { - worker.SetCommand(cmd); + auto* worker = EnsureWorker(id, cmd); + if (!worker->HasCommand()) { + worker->SetCommand(cmd); } - if (!worker.HasSession()) { - WorkersToRun.insert(id); + if (!worker->HasSession()) { + BootQueue.insert(id); + } + + ScheduleProcessQueues(); +} + +bool TController::IsValidWorker(const TWorkerId& id) const { + auto replication = Find(id.ReplicationId()); + if (!replication) { + return false; + } + + if (replication->GetState() != TReplication::EState::Ready) { + return false; + } + + auto* target = replication->FindTarget(id.TargetId()); + if (!target) { + return false; + } + + if (target->GetDstState() != TReplication::EDstState::Ready) { + return false; + } + + if (target->GetStreamState() != TReplication::EStreamState::Ready) { + return false; } - ScheduleRunWorkers(); + return true; } -void TController::ScheduleRunWorkers() { - if (RunWorkersScheduled || !WorkersToRun) { +TWorkerInfo* TController::EnsureWorker(const TWorkerId& id, NKikimrReplication::TRunWorkerCommand* cmd) { + auto it = Workers.find(id); + if (it == Workers.end()) { + it = Workers.emplace(id, cmd).first; + } + + auto replication = Find(id.ReplicationId()); + Y_ABORT_UNLESS(replication); + + auto* target = replication->FindTarget(id.TargetId()); + Y_ABORT_UNLESS(target); + + target->AddWorker(id.WorkerId()); + return &it->second; +} + +void TController::ScheduleProcessQueues() { + if (ProcessQueuesScheduled || (!BootQueue && !StopQueue)) { return; } - Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvRunWorkers()); - RunWorkersScheduled = true; + Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvProcessQueues()); + ProcessQueuesScheduled = true; } -void TController::Handle(TEvPrivate::TEvRunWorkers::TPtr&, const TActorContext& ctx) { - CLOG_D(ctx, "Run workers" - << ": queue# " << WorkersToRun.size()); +void TController::Handle(TEvPrivate::TEvProcessQueues::TPtr&, const TActorContext& ctx) { + CLOG_D(ctx, "Process queues" + << ": boot# " << BootQueue.size() + << ": stop# " << StopQueue.size()); - static constexpr ui32 limit = 100; - ui32 i = 0; + ProcessBootQueue(ctx); + ProcessStopQueue(ctx); + + ProcessQueuesScheduled = false; + ScheduleProcessQueues(); +} - for (auto iter = WorkersToRun.begin(); iter != WorkersToRun.end() && i < limit;) { +void TController::ProcessBootQueue(const TActorContext&) { + ui32 i = 0; + for (auto iter = BootQueue.begin(); iter != BootQueue.end() && i < ProcessBatchLimit;) { const auto id = *iter; + if (!IsValidWorker(id)) { + BootQueue.erase(iter++); + continue; + } auto it = Workers.find(id); - Y_ABORT_UNLESS(it != Workers.end()); + if (it == Workers.end()) { + BootQueue.erase(iter++); + continue; + } auto& worker = it->second; if (worker.HasSession()) { - WorkersToRun.erase(iter++); + BootQueue.erase(iter++); continue; } auto replication = Find(id.ReplicationId()); - if (!replication) { - WorkersToRun.erase(iter++); - continue; - } + Y_ABORT_UNLESS(replication); const auto& tenant = replication->GetTenant(); if (!tenant || !NodesManager.HasTenant(tenant) || !NodesManager.HasNodes(tenant)) { @@ -409,19 +478,16 @@ void TController::Handle(TEvPrivate::TEvRunWorkers::TPtr&, const TActorContext& } Y_ABORT_UNLESS(worker.HasCommand()); - RunWorker(nodeId, id, *worker.GetCommand()); + BootWorker(nodeId, id, *worker.GetCommand()); worker.AttachSession(nodeId); - WorkersToRun.erase(iter++); + BootQueue.erase(iter++); ++i; } - - RunWorkersScheduled = false; - ScheduleRunWorkers(); } -void TController::RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd) { - LOG_D("Run worker" +void TController::BootWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd) { + LOG_D("Boot worker" << ": nodeId# " << nodeId << ", workerId# " << id); @@ -441,6 +507,94 @@ void TController::RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrRepli session.AttachWorker(id); } +void TController::ProcessStopQueue(const TActorContext& ctx) { + ui32 i = 0; + for (auto iter = StopQueue.begin(); iter != StopQueue.end() && i < ProcessBatchLimit;) { + const auto& id = iter->first; + auto sessionId = iter->second; + + if (!Sessions.contains(sessionId) || !Sessions[sessionId].IsReady()) { + MaybeRemoveWorker(id, ctx); + StopQueue.erase(iter++); + continue; + } + + StopWorker(sessionId, id); + + StopQueue.erase(iter++); + ++i; + } +} + +void TController::StopWorker(ui32 nodeId, const TWorkerId& id) { + LOG_D("Stop worker" + << ": nodeId# " << nodeId + << ", workerId# " << id); + + Y_ABORT_UNLESS(Sessions.contains(nodeId)); + auto& session = Sessions[nodeId]; + + auto ev = MakeHolder(); + auto& record = ev->Record; + + auto& controller = *record.MutableController(); + controller.SetTabletId(TabletID()); + controller.SetGeneration(Executor()->Generation()); + id.Serialize(*record.MutableWorker()); + + Send(MakeReplicationServiceId(nodeId), std::move(ev)); + session.DetachWorker(id); +} + +void TController::Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + + const auto& id = ev->Get()->Id; + RemoveQueue.insert(id); + + auto it = Workers.find(id); + if (it == Workers.end()) { + return RemoveWorker(id, ctx); + } + + auto& worker = it->second; + if (!worker.HasSession()) { + return RemoveWorker(id, ctx); + } + + StopQueue.emplace(id, worker.GetSession()); + ScheduleProcessQueues(); +} + +void TController::RemoveWorker(const TWorkerId& id, const TActorContext& ctx) { + LOG_D("Remove worker" + << ", workerId# " << id); + + Y_ABORT_UNLESS(RemoveQueue.contains(id)); + + RemoveQueue.erase(id); + Workers.erase(id); + + auto replication = Find(id.ReplicationId()); + if (!replication) { + return; + } + + auto* target = replication->FindTarget(id.TargetId()); + if (!target) { + return; + } + + target->RemoveWorker(id.WorkerId()); + target->Progress(ctx); +} + +void TController::MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx) { + if (RemoveQueue.contains(id)) { + RemoveWorker(id, ctx); + } +} + void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx) { const ui32 nodeId = ev->Get()->NodeId; @@ -452,7 +606,7 @@ void TController::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const T } } -TReplication::TPtr TController::Find(ui64 id) { +TReplication::TPtr TController::Find(ui64 id) const { auto it = Replications.find(id); if (it == Replications.end()) { return nullptr; @@ -461,7 +615,7 @@ TReplication::TPtr TController::Find(ui64 id) { return it->second; } -TReplication::TPtr TController::Find(const TPathId& pathId) { +TReplication::TPtr TController::Find(const TPathId& pathId) const { auto it = ReplicationsByPathId.find(pathId); if (it == ReplicationsByPathId.end()) { return nullptr; diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index ab7c294c26dc..d0ac57b168fc 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -80,19 +80,27 @@ class TController void Handle(TEvPrivate::TEvResolveSecretResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvResolveTenantResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvUpdateTenantNodes::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPrivate::TEvRunWorkers::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvProcessQueues::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRemoveWorker::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvDiscoveryData::TPtr& ev, const TActorContext& ctx); void Handle(TEvDiscovery::TEvError::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorContext& ctx); void Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx); void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev, const TActorContext& ctx); void CreateSession(ui32 nodeId, const TActorContext& ctx); void DeleteSession(ui32 nodeId, const TActorContext& ctx); void CloseSession(ui32 nodeId, const TActorContext& ctx); - void ScheduleRunWorkers(); - void RunWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd); + void ScheduleProcessQueues(); + void ProcessBootQueue(const TActorContext& ctx); + void ProcessStopQueue(const TActorContext& ctx); + bool IsValidWorker(const TWorkerId& id) const; + TWorkerInfo* EnsureWorker(const TWorkerId& id, NKikimrReplication::TRunWorkerCommand* cmd = nullptr); + void BootWorker(ui32 nodeId, const TWorkerId& id, const NKikimrReplication::TRunWorkerCommand& cmd); void StopWorker(ui32 nodeId, const TWorkerId& id); + void RemoveWorker(const TWorkerId& id, const TActorContext& ctx); + void MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx); // local transactions class TTxInitSchema; @@ -143,8 +151,8 @@ class TController return replication; } - TReplication::TPtr Find(ui64 id); - TReplication::TPtr Find(const TPathId& pathId); + TReplication::TPtr Find(ui64 id) const; + TReplication::TPtr Find(const TPathId& pathId) const; void Remove(ui64 id); private: @@ -158,9 +166,12 @@ class TController TNodesManager NodesManager; THashMap Sessions; THashMap Workers; - THashSet WorkersToRun; + THashSet BootQueue; + THashSet> StopQueue; + THashSet RemoveQueue; - bool RunWorkersScheduled = false; + bool ProcessQueuesScheduled = false; + static constexpr ui32 ProcessBatchLimit = 100; }; // TController diff --git a/ydb/core/tx/replication/controller/private_events.cpp b/ydb/core/tx/replication/controller/private_events.cpp index ab96dd2afe00..30ce9caabe43 100644 --- a/ydb/core/tx/replication/controller/private_events.cpp +++ b/ydb/core/tx/replication/controller/private_events.cpp @@ -152,6 +152,17 @@ TString TEvPrivate::TEvAlterDstResult::ToString() const { return TStringBuilder() << ToStringHeader() << " {" << ToStringBody() << " }"; } +TEvPrivate::TEvRemoveWorker::TEvRemoveWorker(ui64 rid, ui64 tid, ui64 wid) + : Id(rid, tid, wid) +{ +} + +TString TEvPrivate::TEvRemoveWorker::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Id: " << Id + << " }"; +} + } Y_DECLARE_OUT_SPEC(, NKikimr::NReplication::NController::TEvPrivate::TEvDiscoveryTargetsResult::TAddEntry, stream, value) { diff --git a/ydb/core/tx/replication/controller/private_events.h b/ydb/core/tx/replication/controller/private_events.h index 027e10655d3e..8eb3631dd7b5 100644 --- a/ydb/core/tx/replication/controller/private_events.h +++ b/ydb/core/tx/replication/controller/private_events.h @@ -6,6 +6,7 @@ #include #include #include +#include namespace NKikimr::NReplication::NController { @@ -20,9 +21,10 @@ struct TEvPrivate { EvDropReplication, EvResolveTenantResult, EvUpdateTenantNodes, - EvRunWorkers, + EvProcessQueues, EvResolveSecretResult, EvAlterDstResult, + EvRemoveWorker, EvEnd, }; @@ -160,7 +162,7 @@ struct TEvPrivate { TString ToString() const override; }; - struct TEvRunWorkers: public TEventLocal { + struct TEvProcessQueues: public TEventLocal { }; struct TEvResolveSecretResult: public TEventLocal { @@ -182,6 +184,13 @@ struct TEvPrivate { TString ToString() const override; }; + struct TEvRemoveWorker: public TEventLocal { + const TWorkerId Id; + + explicit TEvRemoveWorker(ui64 rid, ui64 tid, ui64 wid); + TString ToString() const override; + }; + }; // TEvPrivate } diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 8a1d05f0113a..97ff20365c37 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -73,6 +74,10 @@ class TReplication: public TSimpleRefCount { virtual const TString& GetIssue() const = 0; virtual void SetIssue(const TString& value) = 0; + virtual void AddWorker(ui64 id) = 0; + virtual void RemoveWorker(ui64 id) = 0; + virtual const THashSet& GetWorkers() const = 0; + virtual void Progress(const TActorContext& ctx) = 0; virtual void Shutdown(const TActorContext& ctx) = 0; diff --git a/ydb/core/tx/replication/controller/session_info.cpp b/ydb/core/tx/replication/controller/session_info.cpp index a698c3373380..8dc535834fb5 100644 --- a/ydb/core/tx/replication/controller/session_info.cpp +++ b/ydb/core/tx/replication/controller/session_info.cpp @@ -29,11 +29,19 @@ const THashSet& TSessionInfo::GetWorkers() const { return Workers; } +bool TSessionInfo::HasWorker(const TWorkerId& id) const { + return Workers.contains(id); +} + TWorkerInfo::TWorkerInfo(NKikimrReplication::TRunWorkerCommand* cmd) { SetCommand(cmd); } void TWorkerInfo::SetCommand(NKikimrReplication::TRunWorkerCommand* cmd) { + if (!cmd) { + return; + } + if (!Command) { Command = MakeHolder(); } diff --git a/ydb/core/tx/replication/controller/session_info.h b/ydb/core/tx/replication/controller/session_info.h index f2e3f173f6cf..95d72a646bcd 100644 --- a/ydb/core/tx/replication/controller/session_info.h +++ b/ydb/core/tx/replication/controller/session_info.h @@ -22,6 +22,7 @@ class TSessionInfo { void AttachWorker(const TWorkerId& id); void DetachWorker(const TWorkerId& id); const THashSet& GetWorkers() const; + bool HasWorker(const TWorkerId& id) const; private: bool Ready; @@ -30,8 +31,7 @@ class TSessionInfo { class TWorkerInfo { public: - TWorkerInfo() = default; - explicit TWorkerInfo(NKikimrReplication::TRunWorkerCommand* cmd); + explicit TWorkerInfo(NKikimrReplication::TRunWorkerCommand* cmd = nullptr); void SetCommand(NKikimrReplication::TRunWorkerCommand* cmd); bool HasCommand() const; diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 63e712048d82..d2ce99fa500a 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -1,6 +1,7 @@ #include "dst_alterer.h" #include "dst_creator.h" #include "dst_remover.h" +#include "private_events.h" #include "target_base.h" #include "util.h" @@ -87,6 +88,27 @@ void TTargetBase::SetIssue(const TString& value) { TruncatedIssue(Issue); } +void TTargetBase::AddWorker(ui64 id) { + Workers.insert(id); +} + +void TTargetBase::RemoveWorker(ui64 id) { + Workers.erase(id); +} + +const THashSet& TTargetBase::GetWorkers() const { + return Workers; +} + +void TTargetBase::RemoveWorkers(const TActorContext& ctx) { + if (!PendingRemoveWorkers) { + PendingRemoveWorkers = true; + for (const auto& id : Workers) { + ctx.Send(ctx.SelfID, new TEvPrivate::TEvRemoveWorker(Replication->GetId(), Id, id)); + } + } +} + void TTargetBase::Progress(const TActorContext& ctx) { switch (DstState) { case EDstState::Creating: @@ -100,14 +122,18 @@ void TTargetBase::Progress(const TActorContext& ctx) { } break; case EDstState::Alter: - if (!DstAlterer) { + if (Workers) { + RemoveWorkers(ctx); + } else if (!DstAlterer) { DstAlterer = ctx.Register(CreateDstAlterer(Replication, Id, ctx)); } break; case EDstState::Done: break; case EDstState::Removing: - if (!DstRemover) { + if (Workers) { + RemoveWorkers(ctx); + } else if (!DstRemover) { DstRemover = ctx.Register(CreateDstRemover(Replication, Id, ctx)); } break; diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index b9dd993513db..b522065dd253 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -14,6 +14,8 @@ class TTargetBase: public TReplication::ITarget { return Replication; } + void RemoveWorkers(const TActorContext& ctx); + public: explicit TTargetBase(TReplication::TPtr replication, ETargetKind kind, ui64 id, const TString& srcPath, const TString& dstPath); @@ -39,6 +41,10 @@ class TTargetBase: public TReplication::ITarget { const TString& GetIssue() const override; void SetIssue(const TString& value) override; + void AddWorker(ui64 id) override; + void RemoveWorker(ui64 id) override; + const THashSet& GetWorkers() const override; + void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; @@ -59,6 +65,8 @@ class TTargetBase: public TReplication::ITarget { TActorId DstAlterer; TActorId DstRemover; TActorId WorkerRegistar; + THashSet Workers; + bool PendingRemoveWorkers = false; }; // TTargetBase diff --git a/ydb/core/tx/replication/controller/target_with_stream.cpp b/ydb/core/tx/replication/controller/target_with_stream.cpp index 114a5bf9f9c7..96145c6783bd 100644 --- a/ydb/core/tx/replication/controller/target_with_stream.cpp +++ b/ydb/core/tx/replication/controller/target_with_stream.cpp @@ -22,7 +22,9 @@ void TTargetWithStream::Progress(const TActorContext& ctx) { } return; case EStreamState::Removing: - if (!StreamRemover) { + if (GetWorkers()) { + RemoveWorkers(ctx); + } else if (!StreamRemover) { StreamRemover = ctx.Register(CreateStreamRemover(replication, GetId(), ctx)); } return; diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 9a896e84e059..0a058cc9e0ff 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include @@ -49,6 +48,8 @@ class TSessionInfo { void RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) { auto res = Workers.emplace(id, ops->Register(actor)); Y_ABORT_UNLESS(res.second); + + ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::RUNNING)); } void StopWorker(IActorOps* ops, const TWorkerId& id) { @@ -57,6 +58,8 @@ class TSessionInfo { ops->Send(it->second, new TEvents::TEvPoison()); Workers.erase(it); + + ops->Send(ActorId, new TEvService::TEvWorkerStatus(id, NKikimrReplication::TEvWorkerStatus::STOPPED)); } void SendStatus(IActorOps* ops) const { diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h index 88c5b751ed8f..186455aa3f42 100644 --- a/ydb/core/tx/replication/service/service.h +++ b/ydb/core/tx/replication/service/service.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace NKikimr::NReplication { @@ -15,6 +16,7 @@ struct TEvService { EvStatus, EvRunWorker, EvStopWorker, + EvWorkerStatus, EvEnd, }; @@ -41,6 +43,15 @@ struct TEvService { struct TEvStopWorker: public TEventPB { TEvStopWorker() = default; }; + + struct TEvWorkerStatus: public TEventPB { + TEvWorkerStatus() = default; + + explicit TEvWorkerStatus(const TWorkerId& id, NKikimrReplication::TEvWorkerStatus::EStatus status) { + id.Serialize(*Record.MutableWorker()); + Record.SetStatus(status); + } + }; }; namespace NService {